You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 11:02:01 UTC

[inlong] branch master updated: [INLONG-7413][Audit] Audit get MQ config from Manager (#7629)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 01dfd9d65 [INLONG-7413][Audit] Audit get MQ config from Manager (#7629)
01dfd9d65 is described below

commit 01dfd9d6557de91472b52583d81c1ceda50661f5
Author: haifxu <xh...@gmail.com>
AuthorDate: Fri Mar 17 19:01:55 2023 +0800

    [INLONG-7413][Audit] Audit get MQ config from Manager (#7629)
---
 .../inlong/audit/consts/ConfigConstants.java       |   4 +
 .../apache/inlong/audit/file/ConfigManager.java    |  61 ++++++++----
 .../apache/inlong/audit/file/RemoteConfigJson.java |  35 ++-----
 .../org/apache/inlong/audit/node/Application.java  |  22 +++--
 .../org/apache/inlong/audit/sink/KafkaSink.java    |  15 ++-
 .../org/apache/inlong/audit/sink/PulsarSink.java   |   5 +-
 .../org/apache/inlong/audit/sink/TubeSink.java     |  17 +++-
 .../audit/sink/pulsar/PulsarClientService.java     |  13 ++-
 .../apache/inlong/audit/sink/KafkaSinkTest.java    |   1 -
 .../apache/inlong/audit/sink/PulsarSinkTest.java   |  15 +--
 .../org/apache/inlong/audit/sink/TubeSinkTest.java |   1 -
 .../audit/service/AuditMsgConsumerServer.java      | 109 +++++++++++++++++++--
 inlong-audit/conf/application.properties           |   7 +-
 inlong-audit/conf/audit-proxy-kafka.conf           |   2 -
 inlong-audit/conf/audit-proxy-pulsar.conf          |   2 -
 inlong-audit/conf/audit-proxy-tubemq.conf          |   2 -
 .../inlong/common/pojo/audit/AuditConfig.java      |  42 ++------
 .../common/pojo/audit/AuditConfigRequest.java      |  43 ++------
 .../apache/inlong/common/pojo/audit/MQInfo.java    |  44 +++------
 .../service/cluster/InlongClusterService.java      |   9 ++
 .../service/cluster/InlongClusterServiceImpl.java  |  23 +++++
 .../web/controller/openapi/AuditController.java    |  53 ++++++++++
 22 files changed, 342 insertions(+), 183 deletions(-)

diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
index 4c8b93671..987a83a0d 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
@@ -66,4 +66,8 @@ public class ConfigConstants {
     public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
 
     public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
+
+    public static final String MANAGER_PATH = "/inlong/manager/openapi";
+
+    public static final String MANAGER_GET_CONFIG_PATH = "/audit/getConfig";
 }
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
index 80fe5a642..a86a59502 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -22,14 +22,20 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
+import org.apache.inlong.audit.consts.ConfigConstants;
 import org.apache.inlong.audit.file.holder.PropertiesConfigHolder;
+import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
+import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,9 +48,11 @@ public class ConfigManager {
     private static final Map<String, ConfigHolder> holderMap =
             new ConcurrentHashMap<>();
 
+    public static List<MQInfo> mqInfoList = new ArrayList<>();
+
     private static ConfigManager instance = null;
 
-    private static String DEFAULT_CONFIG_PROPERTIES = "server.properties";
+    private static String DEFAULT_CONFIG_PROPERTIES = "application.properties";
 
     static {
         instance = getInstance(DEFAULT_CONFIG_PROPERTIES, true);
@@ -123,6 +131,10 @@ public class ConfigManager {
         }
     }
 
+    public List<MQInfo> getMqInfoList() {
+        return mqInfoList;
+    }
+
     public ConfigHolder getDefaultConfigHolder() {
         return holderMap.get(DEFAULT_CONFIG_PROPERTIES);
     }
@@ -189,33 +201,41 @@ public class ConfigManager {
             }
         }
 
-        private boolean checkWithManager(String host) {
-            HttpGet httpGet = null;
+        private boolean checkWithManager(String host, String clusterTag) {
+            HttpPost httpPost = null;
             try {
-                String url = "http://" + host + "/inlong/manager/openapi/audit/getConfig";
+                String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_CONFIG_PATH;
                 LOG.info("start to request {} to get config info", url);
-                httpGet = new HttpGet(url);
-                httpGet.addHeader(HttpHeaders.CONNECTION, "close");
+                httpPost = new HttpPost(url);
+                httpPost.addHeader(HttpHeaders.CONNECTION, "close");
+
+                // request body
+                AuditConfigRequest request = new AuditConfigRequest();
+                request.setClusterTag(clusterTag);
+                StringEntity stringEntity = new StringEntity(gson.toJson(request));
+                stringEntity.setContentType("application/json");
+                httpPost.setEntity(stringEntity);
 
                 // request with post
-                CloseableHttpResponse response = httpClient.execute(httpGet);
+                LOG.info("start to request {} to get config info with params {}", url, request);
+                CloseableHttpResponse response = httpClient.execute(httpPost);
                 String returnStr = EntityUtils.toString(response.getEntity());
                 // get groupId <-> topic and m value.
 
-                Map<String, String> configJsonMap = gson.fromJson(returnStr, Map.class);
-                if (configJsonMap != null && configJsonMap.size() > 0) {
-                    for (Entry<String, String> entry : configJsonMap.entrySet()) {
-                        Map<String, String> valueMap = gson.fromJson(entry.getValue(), Map.class);
-                        configManager.updatePropertiesHolder(valueMap,
-                                entry.getKey(), true);
+                RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
+                if (configJson.isSuccess() && configJson.getData() != null) {
+                    mqInfoList = configJson.getData().getMqInfoList();
+                    if (mqInfoList == null || mqInfoList.isEmpty()) {
+                        LOG.error("getConfig from manager: no available mq config");
+                        return false;
                     }
                 }
             } catch (Exception ex) {
                 LOG.error("exception caught", ex);
                 return false;
             } finally {
-                if (httpGet != null) {
-                    httpGet.releaseConnection();
+                if (httpPost != null) {
+                    httpPost.releaseConnection();
                 }
             }
             return true;
@@ -224,10 +244,13 @@ public class ConfigManager {
         private void checkRemoteConfig() {
 
             try {
-                String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager_hosts");
+                String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager.hosts");
+                String proxyClusterTag = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES)
+                        .get("proxy.cluster.tag");
+                LOG.info("manager url: {}", managerHosts);
                 String[] hostList = StringUtils.split(managerHosts, ",");
                 for (String host : hostList) {
-                    if (checkWithManager(host)) {
+                    if (checkWithManager(host, proxyClusterTag)) {
                         break;
                     }
                 }
@@ -242,7 +265,6 @@ public class ConfigManager {
             while (isRunning) {
 
                 long sleepTimeInMs = getSleepTime();
-                count += 1;
                 try {
                     checkLocalFile();
                     // wait for 30 seconds to update remote config
@@ -254,6 +276,7 @@ public class ConfigManager {
                 } catch (Exception ex) {
                     LOG.error("exception caught", ex);
                 }
+                count += 1;
             }
         }
     }
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
index 9172ff736..8ab17c7af 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
@@ -17,38 +17,23 @@
 
 package org.apache.inlong.audit.file;
 
-import java.util.List;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
 
 public class RemoteConfigJson {
 
-    private boolean result;
-    private List<DataItem> data;
-    private int errCode;
+    private boolean success;
+    private String errMsg;
+    private AuditConfig data;
 
-    public List<DataItem> getData() {
-        return data;
+    public boolean isSuccess() {
+        return success;
     }
 
-    public int getErrCode() {
-        return errCode;
+    public String getErrMsg() {
+        return errMsg;
     }
 
-    public static class DataItem {
-
-        private String groupId;
-        private String topic;
-        private String m;
-
-        public String getGroupId() {
-            return groupId;
-        }
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public String getM() {
-            return m;
-        }
+    public AuditConfig getData() {
+        return data;
     }
 }
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index c407d8716..732dae977 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -21,15 +21,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -52,9 +43,20 @@ import org.apache.flume.node.MaterializedConfiguration;
 import org.apache.flume.node.PollingPropertiesFileConfigurationProvider;
 import org.apache.flume.node.PropertiesFileConfigurationProvider;
 import org.apache.flume.util.SSLUtil;
+import org.apache.inlong.audit.file.ConfigManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * 
  * Application
@@ -294,6 +296,8 @@ public class Application {
 
             Application application;
 
+            ConfigManager.getInstance();
+
             File configurationFile = new File(commandLine.getOptionValue('f'));
 
             // The following is to ensure that by default the agent will fail on startup
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
index 1f357bec5..a4b67c9ed 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
@@ -28,7 +28,10 @@ import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.file.ConfigManager;
 import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -40,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executors;
@@ -54,6 +58,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
     // for kafka producer
     private static Properties properties = new Properties();
+    private String kafkaServerUrl;
     private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
     private static final String TOPIC = "topic";
     private static final String RETRIES = "retries";
@@ -247,7 +252,6 @@ public class KafkaSink extends AbstractSink implements Configurable {
         localIp = NetworkUtils.getLocalIp();
 
         properties = new Properties();
-        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getString(BOOTSTRAP_SERVER));
         properties.put(ProducerConfig.ACKS_CONFIG, defaultAcks);
         properties.put(ProducerConfig.RETRIES_CONFIG, context.getString(RETRIES, defaultRetries));
         properties.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getString(BATCH_SIZE, defaultBatchSize));
@@ -256,6 +260,15 @@ public class KafkaSink extends AbstractSink implements Configurable {
     }
 
     private void initTopicProducer(String topic) {
+        ConfigManager configManager = ConfigManager.getInstance();
+        List<MQInfo> mqInfoList = configManager.getMqInfoList();
+        mqInfoList.forEach(mqClusterInfo -> {
+            if (MQType.KAFKA.equals(mqClusterInfo.getMqType())) {
+                kafkaServerUrl = mqClusterInfo.getUrl();
+            }
+        });
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
+
         if (StringUtils.isEmpty(topic)) {
             logger.error("topic is empty");
         }
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
index e374b5de2..30b388097 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -148,6 +148,8 @@ public class PulsarSink extends AbstractSink
 
     private String topic;
 
+    private Context context;
+
     static {
         /*
          * stat pulsar performance
@@ -168,6 +170,7 @@ public class PulsarSink extends AbstractSink
      */
     public void configure(Context context) {
         logger.info("PulsarSink started and context = {}", context.toString());
+        this.context = context;
         /*
          * topic config
          */
@@ -188,7 +191,6 @@ public class PulsarSink extends AbstractSink
         if (diskIORatePerSec != 0) {
             diskRateLimiter = RateLimiter.create(diskIORatePerSec);
         }
-        pulsarClientService = new PulsarClientService(context);
 
         if (sinkCounter == null) {
             sinkCounter = new SinkCounter(getName());
@@ -208,6 +210,7 @@ public class PulsarSink extends AbstractSink
     public void start() {
         logger.info("pulsar sink starting");
         sinkCounter.start();
+        pulsarClientService = new PulsarClientService(context);
         pulsarClientService.initCreateConnection(this);
 
         super.start();
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
index b5f22d1e5..c23e88e63 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
@@ -31,7 +31,10 @@ import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.inlong.audit.base.HighPriorityThreadFactory;
 import org.apache.inlong.audit.consts.ConfigConstants;
+import org.apache.inlong.audit.file.ConfigManager;
 import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -46,6 +49,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -256,7 +260,6 @@ public class TubeSink extends AbstractSink implements Configurable {
         topic = context.getString(TOPIC);
         Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic specified");
 
-        masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
         Preconditions.checkState(masterHostAndPortList != null,
                 "No master and port list specified");
 
@@ -363,7 +366,7 @@ public class TubeSink extends AbstractSink implements Configurable {
         }
 
         try {
-            TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
+            TubeClientConfig conf = initTubeConfig();
             sessionFactory = new TubeMultiSessionFactory(conf);
             logger.info("create tube connection successfully");
         } catch (TubeClientException e) {
@@ -383,7 +386,15 @@ public class TubeSink extends AbstractSink implements Configurable {
         }
     }
 
-    private TubeClientConfig initTubeConfig(String masterHostAndPortList) throws Exception {
+    private TubeClientConfig initTubeConfig() throws Exception {
+        ConfigManager configManager = ConfigManager.getInstance();
+        List<MQInfo> mqInfoList = configManager.getMqInfoList();
+        mqInfoList.forEach(mqClusterInfo -> {
+            if (MQType.TUBEMQ.equals(mqClusterInfo.getMqType())) {
+                masterHostAndPortList = mqClusterInfo.getUrl();
+            }
+        });
+
         final TubeClientConfig tubeClientConfig = new TubeClientConfig(masterHostAndPortList);
         tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
         tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
index 9d95b36e9..ab1f07062 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
@@ -23,8 +23,11 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.inlong.audit.consts.AttributeConstants;
+import org.apache.inlong.audit.file.ConfigManager;
 import org.apache.inlong.audit.sink.EventStat;
 import org.apache.inlong.audit.utils.LogCounter;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.apache.inlong.common.util.NetworkUtils;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -36,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -94,8 +98,13 @@ public class PulsarClientService {
      * @param context
      */
     public PulsarClientService(Context context) {
-
-        pulsarServerUrl = context.getString(PULSAR_SERVER_URL);
+        ConfigManager configManager = ConfigManager.getInstance();
+        List<MQInfo> mqInfoList = configManager.getMqInfoList();
+        mqInfoList.forEach(mqClusterInfo -> {
+            if (MQType.PULSAR.equals(mqClusterInfo.getMqType())) {
+                pulsarServerUrl = mqClusterInfo.getUrl();
+            }
+        });
         Preconditions.checkState(pulsarServerUrl != null, "No pulsar server url specified");
 
         sendTimeout = context.getInteger(SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT_MILL);
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java
index eed28a208..4f451f87f 100644
--- a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java
@@ -49,7 +49,6 @@ public class KafkaSinkTest {
         context = new Context();
 
         context.put("topic", "inlong-audit");
-        context.put("master-host-port-list", "127.0.0.1:8080");
 
         kafkaSink.setChannel(channel);
         Configurables.configure(kafkaSink, context);
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
index f0914c0e1..320c55e35 100644
--- a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
@@ -21,7 +21,6 @@ import com.google.common.base.Charsets;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
@@ -32,6 +31,7 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,16 +47,19 @@ public class PulsarSinkTest {
 
     private PulsarSink sink;
     private Channel channel;
+    private Context context;
 
     @Before
-    public void setUp() {
-        sink = new PulsarSink();
+    public void setUp() throws Exception {
+        sink = PowerMockito.mock(PulsarSink.class);
+        PowerMockito.doNothing().when(sink, "start");
+        PowerMockito.when(sink.process()).thenReturn(Sink.Status.READY);
+        PowerMockito.when(sink.getLifecycleState()).thenReturn(LifecycleState.ERROR);
         channel = new MemoryChannel();
 
-        Context context = new Context();
+        context = new Context();
 
         context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
-        context.put("pulsar_server_url", "pulsar://127.0.0.1:6650");
 
         sink.setChannel(channel);
 
@@ -65,7 +68,7 @@ public class PulsarSinkTest {
     }
 
     @Test
-    public void testProcess() throws InterruptedException, EventDeliveryException {
+    public void testProcess() throws Exception {
         setUp();
         Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
         sink.start();
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
index cb309a2cb..650a26dc8 100644
--- a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
@@ -50,7 +50,6 @@ public class TubeSinkTest {
         context = new Context();
 
         context.put("topic", "inlong-audit");
-        context.put("master-host-port-list", "127.0.0.1:8080");
 
         tubeSink.setChannel(channel);
         Configurables.configure(tubeSink, context);
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 5fbe0f6ad..fe722a4a9 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,22 +17,39 @@
 
 package org.apache.inlong.audit.service;
 
+import com.google.gson.Gson;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
 import org.apache.inlong.audit.config.ClickHouseConfig;
 import org.apache.inlong.audit.config.MessageQueueConfig;
 import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.consts.ConfigConstants;
 import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.file.RemoteConfigJson;
 import org.apache.inlong.audit.service.consume.BaseConsume;
 import org.apache.inlong.audit.service.consume.KafkaConsume;
 import org.apache.inlong.audit.service.consume.PulsarConsume;
 import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
+import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 @Service
 public class AuditMsgConsumerServer implements InitializingBean {
@@ -51,21 +68,37 @@ public class AuditMsgConsumerServer implements InitializingBean {
     // ClickHouseService
     private ClickHouseService ckService;
 
+    private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties";
+
+    private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+
+    private final Gson gson = new Gson();
+
     /**
      * Initializing bean
      */
     public void afterPropertiesSet() {
+        List<MQInfo> mqInfoList = getClusterFromManager();
         BaseConsume mqConsume = null;
         List<InsertData> insertServiceList = this.getInsertServiceList();
-        if (mqConfig.isPulsar()) {
-            mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
-        } else if (mqConfig.isTube()) {
-            mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
-        } else if (mqConfig.isKafka()) {
-            mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
-        } else {
-            LOG.error("unknown MessageQueue {}", mqConfig.getMqType());
-            return;
+
+        for (MQInfo mqInfo : mqInfoList) {
+            if (mqConfig.isPulsar()) {
+                mqConfig.setPulsarServerUrl(mqInfo.getUrl());
+                mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
+                break;
+            } else if (mqConfig.isTube()) {
+                mqConfig.setTubeMasterList(mqInfo.getUrl());
+                mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
+                break;
+            } else if (mqConfig.isKafka()) {
+                mqConfig.setKafkaServerUrl(mqInfo.getUrl());
+                mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
+                break;
+            } else {
+                LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
+                return;
+            }
         }
 
         if (storeConfig.isElasticsearchStore()) {
@@ -79,6 +112,7 @@ public class AuditMsgConsumerServer implements InitializingBean {
 
     /**
      * getInsertServiceList
+     *
      * @return
      */
     private List<InsertData> getInsertServiceList() {
@@ -96,4 +130,61 @@ public class AuditMsgConsumerServer implements InitializingBean {
         }
         return insertServiceList;
     }
+
+    private List<MQInfo> getClusterFromManager() {
+        Properties properties = new Properties();
+        try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) {
+            properties.load(inputStream);
+            String managerHosts = properties.getProperty("manager.hosts");
+            String clusterTag = properties.getProperty("proxy.cluster.tag");
+            String[] hostList = StringUtils.split(managerHosts, ",");
+            for (String host : hostList) {
+                List<MQInfo> mqConfig = getMQConfig(host, clusterTag);
+                if (ObjectUtils.isNotEmpty(mqConfig)) {
+                    LOG.info("return mqConfig");
+                    return mqConfig;
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return null;
+    }
+
+    private List<MQInfo> getMQConfig(String host, String clusterTag) {
+        HttpPost httpPost = null;
+        try {
+            String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_CONFIG_PATH;
+            LOG.info("start to request {} to get config info", url);
+            httpPost = new HttpPost(url);
+            httpPost.addHeader(HttpHeaders.CONNECTION, "close");
+
+            // request body
+            AuditConfigRequest request = new AuditConfigRequest();
+            request.setClusterTag(clusterTag);
+            StringEntity stringEntity = new StringEntity(gson.toJson(request));
+            stringEntity.setContentType("application/json");
+            httpPost.setEntity(stringEntity);
+
+            // request with post
+            LOG.info("start to request {} to get config info with params {}", url, request);
+            CloseableHttpResponse response = httpClient.execute(httpPost);
+            String returnStr = EntityUtils.toString(response.getEntity());
+            RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
+            if (configJson.isSuccess() && configJson.getData() != null) {
+                List<MQInfo> mqInfoList = configJson.getData().getMqInfoList();
+                if (mqInfoList != null && !mqInfoList.isEmpty()) {
+                    return mqInfoList;
+                }
+            }
+        } catch (Exception ex) {
+            LOG.error("Failed to get MQ config from manager, please check it", ex);
+            return null;
+        } finally {
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+        }
+        return null;
+    }
 }
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index b2027f73d..de305bd11 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -48,20 +48,21 @@ audit.config.proxy.type=pulsar
 # store.server: mysql / clickhouse / elasticsearch
 audit.config.store.mode=mysql
 
+# manger config
+manager.hosts=127.0.0.1:8083
+proxy.cluster.tag=default_cluster
+
 # pulsar config
-audit.pulsar.server.url=pulsar://127.0.0.1:6650
 audit.pulsar.topic=persistent://public/default/inlong-audit
 audit.pulsar.consumer.sub.name=inlong-audit-subscription
 audit.pulsar.token=
 audit.pulsar.enable.auth=false
 
 # tube config
-audit.tube.masterlist=127.0.0.1:8715
 audit.tube.topic=inlong-audit
 audit.tube.consumer.group.name=inlong-audit-consumer
 
 # kafka config
-audit.kafka.server.url=127.0.0.1:9092
 audit.kafka.topic=inlong-audit
 audit.kafka.consumer.name=inlong-audit-consumer
 audit.kafka.group.id=audit-consumer-group
diff --git a/inlong-audit/conf/audit-proxy-kafka.conf b/inlong-audit/conf/audit-proxy-kafka.conf
index 7263b7427..df14a6e6d 100644
--- a/inlong-audit/conf/audit-proxy-kafka.conf
+++ b/inlong-audit/conf/audit-proxy-kafka.conf
@@ -58,7 +58,6 @@ agent1.channels.ch-msg2.fsyncInterval = 10
 
 agent1.sinks.kafka-sink-msg1.channel = ch-msg1
 agent1.sinks.kafka-sink-msg1.type =  org.apache.inlong.audit.sink.KafkaSink
-agent1.sinks.kafka-sink-msg1.bootstrap_servers = localhost:9092
 agent1.sinks.kafka-sink-msg1.topic = inlong-audit
 agent1.sinks.kafka-sink-msg1.retries = 0
 agent1.sinks.kafka-sink-msg1.batch_size = 16384
@@ -67,7 +66,6 @@ agent1.sinks.kafka-sink-msg1.buffer_memory = 33554432
 
 agent1.sinks.kafka-sink-msg2.channel = ch-msg1
 agent1.sinks.kafka-sink-msg2.type =  org.apache.inlong.audit.sink.KafkaSink
-agent1.sinks.kafka-sink-msg2.bootstrap_servers = localhost:9092
 agent1.sinks.kafka-sink-msg2.topic = inlong-audit
 agent1.sinks.kafka-sink-msg2.retries = 0
 agent1.sinks.kafka-sink-msg2.batch_size = 16384
diff --git a/inlong-audit/conf/audit-proxy-pulsar.conf b/inlong-audit/conf/audit-proxy-pulsar.conf
index a64e73aab..633faff83 100644
--- a/inlong-audit/conf/audit-proxy-pulsar.conf
+++ b/inlong-audit/conf/audit-proxy-pulsar.conf
@@ -59,7 +59,6 @@ agent1.channels.ch-msg2.fsyncInterval = 10
 
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.audit.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url = pulsar://localhost:6650
 agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/inlong-audit
 agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
 agent1.sinks.pulsar-sink-msg1.auth_token =
@@ -77,7 +76,6 @@ agent1.sinks.pulsar-sink-msg1.disk_io_rate_per_sec= 20000000
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.audit.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url = pulsar://localhost:6650
 agent1.sinks.pulsar-sink-msg2.topic = persistent://public/default/inlong-audit
 agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
 agent1.sinks.pulsar-sink-msg1.auth_token =
diff --git a/inlong-audit/conf/audit-proxy-tubemq.conf b/inlong-audit/conf/audit-proxy-tubemq.conf
index 69bb76b12..d4b9cc95f 100644
--- a/inlong-audit/conf/audit-proxy-tubemq.conf
+++ b/inlong-audit/conf/audit-proxy-tubemq.conf
@@ -61,7 +61,6 @@ agent1.channels.ch-msg2.fsyncInterval = 10
 
 agent1.sinks.tube-sink-msg1.channel = ch-msg1
 agent1.sinks.tube-sink-msg1.type =  org.apache.inlong.audit.sink.TubeSink
-agent1.sinks.tube-sink-msg1.master-host-port-list = localhost:8715
 agent1.sinks.tube-sink-msg1.topic = inlong-audit
 agent1.sinks.tube-sink-msg1.send_timeout = 30000
 agent1.sinks.tube-sink-msg1.stat-interval-sec = 60
@@ -75,7 +74,6 @@ agent1.sinks.tube-sink-msg1.set=10
 
 agent1.sinks.tube-sink-msg2.channel = ch-msg2
 agent1.sinks.tube-sink-msg2.type = org.apache.inlong.audit.sink.TubeSink
-agent1.sinks.tube-sink-msg2.master-host-port-list = localhost:8715
 agent1.sinks.tube-sink-msg2.topic = inlong-audit
 agent1.sinks.tube-sink-msg2.send_timeout = 30000
 agent1.sinks.tube-sink-msg2.stat-interval-sec = 60
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfig.java
similarity index 56%
copy from inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfig.java
index 9172ff736..b42d7088b 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfig.java
@@ -15,40 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.file;
+package org.apache.inlong.common.pojo.audit;
 
-import java.util.List;
-
-public class RemoteConfigJson {
-
-    private boolean result;
-    private List<DataItem> data;
-    private int errCode;
-
-    public List<DataItem> getData() {
-        return data;
-    }
-
-    public int getErrCode() {
-        return errCode;
-    }
+import lombok.Data;
 
-    public static class DataItem {
-
-        private String groupId;
-        private String topic;
-        private String m;
-
-        public String getGroupId() {
-            return groupId;
-        }
+import java.util.ArrayList;
+import java.util.List;
 
-        public String getTopic() {
-            return topic;
-        }
+/**
+ * Audit config, includes MQ server URL and other params.
+ */
+@Data
+public class AuditConfig {
 
-        public String getM() {
-            return m;
-        }
-    }
+    private List<MQInfo> mqInfoList = new ArrayList<>();
 }
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfigRequest.java
similarity index 55%
copy from inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfigRequest.java
index 9172ff736..30851fbe5 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfigRequest.java
@@ -15,40 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.file;
+package org.apache.inlong.common.pojo.audit;
 
-import java.util.List;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-public class RemoteConfigJson {
-
-    private boolean result;
-    private List<DataItem> data;
-    private int errCode;
-
-    public List<DataItem> getData() {
-        return data;
-    }
-
-    public int getErrCode() {
-        return errCode;
-    }
-
-    public static class DataItem {
-
-        private String groupId;
-        private String topic;
-        private String m;
-
-        public String getGroupId() {
-            return groupId;
-        }
-
-        public String getTopic() {
-            return topic;
-        }
+/**
+ * Audit MQ config request info.
+ */
+@Data
+@NoArgsConstructor
+public class AuditConfigRequest {
 
-        public String getM() {
-            return m;
-        }
-    }
+    private String clusterTag;
 }
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/MQInfo.java
similarity index 55%
copy from inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/MQInfo.java
index 9172ff736..9487663f8 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/MQInfo.java
@@ -15,40 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.file;
+package org.apache.inlong.common.pojo.audit;
 
-import java.util.List;
+import lombok.Data;
 
-public class RemoteConfigJson {
+import java.util.HashMap;
+import java.util.Map;
 
-    private boolean result;
-    private List<DataItem> data;
-    private int errCode;
-
-    public List<DataItem> getData() {
-        return data;
-    }
-
-    public int getErrCode() {
-        return errCode;
-    }
-
-    public static class DataItem {
-
-        private String groupId;
-        private String topic;
-        private String m;
-
-        public String getGroupId() {
-            return groupId;
-        }
-
-        public String getTopic() {
-            return topic;
-        }
+/**
+ * MQ info
+ */
+@Data
+public class MQInfo {
 
-        public String getM() {
-            return m;
-        }
-    }
+    private String url;
+    private String mqType;
+    private Map<String, String> params = new HashMap<>();
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index fac486088..e7c258350 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.cluster;
 
+import org.apache.inlong.common.pojo.audit.AuditConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
@@ -419,6 +420,14 @@ public interface InlongClusterService {
      */
     String getAllConfig(String clusterName, String md5);
 
+    /**
+     * Get the MQ info by cluster tag for Audit
+     *
+     * @param clusterTag cluster tag
+     * @return MQ info
+     */
+    AuditConfig getAuditConfig(String clusterTag);
+
     /**
      * Test whether the connection can be successfully established.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 7d04689f5..7d9b05709 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -26,6 +26,8 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
+import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
@@ -1329,6 +1331,27 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         return configJson;
     }
 
+    @Override
+    public AuditConfig getAuditConfig(String clusterTag) {
+        AuditConfig auditConfig = new AuditConfig();
+        ClusterPageRequest request = ClusterPageRequest.builder()
+                .clusterTag(clusterTag)
+                .typeList(Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR, ClusterType.KAFKA))
+                .build();
+        List<InlongClusterEntity> clusterEntityList = clusterMapper.selectByCondition(request);
+        LOGGER.info("clusterEntityList {}", clusterEntityList);
+        List<MQInfo> mqInfoList = new ArrayList<>();
+        for (InlongClusterEntity entity : clusterEntityList) {
+            MQInfo info = new MQInfo();
+            info.setUrl(entity.getUrl());
+            info.setMqType(entity.getType());
+            info.setParams(GSON.fromJson(entity.getExtParams(), Map.class));
+            mqInfoList.add(info);
+        }
+        auditConfig.setMqInfoList(mqInfoList);
+        return auditConfig;
+    }
+
     @Override
     public Boolean testConnection(ClusterRequest request) {
         LOGGER.info("begin test connection for: {}", request);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java
new file mode 100644
index 000000000..f865394f2
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
+import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Audit controller.
+ */
+@RestController("OpenAuditController")
+@RequestMapping("/openapi")
+@Api(tags = "Open-Audit-API")
+public class AuditController {
+
+    @Autowired
+    private InlongClusterService clusterService;
+
+    @PostMapping("/audit/getConfig")
+    @ApiOperation(value = "Get mq config list")
+    public Response<AuditConfig> getConfig(@RequestBody AuditConfigRequest request) {
+        AuditConfig auditConfig = clusterService.getAuditConfig(request.getClusterTag());
+        if (CollectionUtils.isEmpty(auditConfig.getMqInfoList())) {
+            return Response.fail("Failed to get MQ config of cluster tag: " + request.getClusterTag());
+        }
+        return Response.success(auditConfig);
+    }
+}