You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 11:02:01 UTC
[inlong] branch master updated: [INLONG-7413][Audit] Audit get MQ config from Manager (#7629)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 01dfd9d65 [INLONG-7413][Audit] Audit get MQ config from Manager (#7629)
01dfd9d65 is described below
commit 01dfd9d6557de91472b52583d81c1ceda50661f5
Author: haifxu <xh...@gmail.com>
AuthorDate: Fri Mar 17 19:01:55 2023 +0800
[INLONG-7413][Audit] Audit get MQ config from Manager (#7629)
---
.../inlong/audit/consts/ConfigConstants.java | 4 +
.../apache/inlong/audit/file/ConfigManager.java | 61 ++++++++----
.../apache/inlong/audit/file/RemoteConfigJson.java | 35 ++-----
.../org/apache/inlong/audit/node/Application.java | 22 +++--
.../org/apache/inlong/audit/sink/KafkaSink.java | 15 ++-
.../org/apache/inlong/audit/sink/PulsarSink.java | 5 +-
.../org/apache/inlong/audit/sink/TubeSink.java | 17 +++-
.../audit/sink/pulsar/PulsarClientService.java | 13 ++-
.../apache/inlong/audit/sink/KafkaSinkTest.java | 1 -
.../apache/inlong/audit/sink/PulsarSinkTest.java | 15 +--
.../org/apache/inlong/audit/sink/TubeSinkTest.java | 1 -
.../audit/service/AuditMsgConsumerServer.java | 109 +++++++++++++++++++--
inlong-audit/conf/application.properties | 7 +-
inlong-audit/conf/audit-proxy-kafka.conf | 2 -
inlong-audit/conf/audit-proxy-pulsar.conf | 2 -
inlong-audit/conf/audit-proxy-tubemq.conf | 2 -
.../inlong/common/pojo/audit/AuditConfig.java | 42 ++------
.../common/pojo/audit/AuditConfigRequest.java | 43 ++------
.../apache/inlong/common/pojo/audit/MQInfo.java | 44 +++------
.../service/cluster/InlongClusterService.java | 9 ++
.../service/cluster/InlongClusterServiceImpl.java | 23 +++++
.../web/controller/openapi/AuditController.java | 53 ++++++++++
22 files changed, 342 insertions(+), 183 deletions(-)
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
index 4c8b93671..987a83a0d 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/ConfigConstants.java
@@ -66,4 +66,8 @@ public class ConfigConstants {
public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 4000000L;
public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L;
+
+ public static final String MANAGER_PATH = "/inlong/manager/openapi";
+
+ public static final String MANAGER_GET_CONFIG_PATH = "/audit/getConfig";
}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
index 80fe5a642..a86a59502 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -22,14 +22,20 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
+import org.apache.inlong.audit.consts.ConfigConstants;
import org.apache.inlong.audit.file.holder.PropertiesConfigHolder;
+import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
+import org.apache.inlong.common.pojo.audit.MQInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,9 +48,11 @@ public class ConfigManager {
private static final Map<String, ConfigHolder> holderMap =
new ConcurrentHashMap<>();
+ public static List<MQInfo> mqInfoList = new ArrayList<>();
+
private static ConfigManager instance = null;
- private static String DEFAULT_CONFIG_PROPERTIES = "server.properties";
+ private static String DEFAULT_CONFIG_PROPERTIES = "application.properties";
static {
instance = getInstance(DEFAULT_CONFIG_PROPERTIES, true);
@@ -123,6 +131,10 @@ public class ConfigManager {
}
}
+ public List<MQInfo> getMqInfoList() {
+ return mqInfoList;
+ }
+
public ConfigHolder getDefaultConfigHolder() {
return holderMap.get(DEFAULT_CONFIG_PROPERTIES);
}
@@ -189,33 +201,41 @@ public class ConfigManager {
}
}
- private boolean checkWithManager(String host) {
- HttpGet httpGet = null;
+ private boolean checkWithManager(String host, String clusterTag) {
+ HttpPost httpPost = null;
try {
- String url = "http://" + host + "/inlong/manager/openapi/audit/getConfig";
+ String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_CONFIG_PATH;
LOG.info("start to request {} to get config info", url);
- httpGet = new HttpGet(url);
- httpGet.addHeader(HttpHeaders.CONNECTION, "close");
+ httpPost = new HttpPost(url);
+ httpPost.addHeader(HttpHeaders.CONNECTION, "close");
+
+ // request body
+ AuditConfigRequest request = new AuditConfigRequest();
+ request.setClusterTag(clusterTag);
+ StringEntity stringEntity = new StringEntity(gson.toJson(request));
+ stringEntity.setContentType("application/json");
+ httpPost.setEntity(stringEntity);
// request with post
- CloseableHttpResponse response = httpClient.execute(httpGet);
+ LOG.info("start to request {} to get config info with params {}", url, request);
+ CloseableHttpResponse response = httpClient.execute(httpPost);
String returnStr = EntityUtils.toString(response.getEntity());
// get groupId <-> topic and m value.
- Map<String, String> configJsonMap = gson.fromJson(returnStr, Map.class);
- if (configJsonMap != null && configJsonMap.size() > 0) {
- for (Entry<String, String> entry : configJsonMap.entrySet()) {
- Map<String, String> valueMap = gson.fromJson(entry.getValue(), Map.class);
- configManager.updatePropertiesHolder(valueMap,
- entry.getKey(), true);
+ RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
+ if (configJson.isSuccess() && configJson.getData() != null) {
+ mqInfoList = configJson.getData().getMqInfoList();
+ if (mqInfoList == null || mqInfoList.isEmpty()) {
+ LOG.error("getConfig from manager: no available mq config");
+ return false;
}
}
} catch (Exception ex) {
LOG.error("exception caught", ex);
return false;
} finally {
- if (httpGet != null) {
- httpGet.releaseConnection();
+ if (httpPost != null) {
+ httpPost.releaseConnection();
}
}
return true;
@@ -224,10 +244,13 @@ public class ConfigManager {
private void checkRemoteConfig() {
try {
- String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager_hosts");
+ String managerHosts = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES).get("manager.hosts");
+ String proxyClusterTag = configManager.getProperties(DEFAULT_CONFIG_PROPERTIES)
+ .get("proxy.cluster.tag");
+ LOG.info("manager url: {}", managerHosts);
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
- if (checkWithManager(host)) {
+ if (checkWithManager(host, proxyClusterTag)) {
break;
}
}
@@ -242,7 +265,6 @@ public class ConfigManager {
while (isRunning) {
long sleepTimeInMs = getSleepTime();
- count += 1;
try {
checkLocalFile();
// wait for 30 seconds to update remote config
@@ -254,6 +276,7 @@ public class ConfigManager {
} catch (Exception ex) {
LOG.error("exception caught", ex);
}
+ count += 1;
}
}
}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
index 9172ff736..8ab17c7af 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
@@ -17,38 +17,23 @@
package org.apache.inlong.audit.file;
-import java.util.List;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
public class RemoteConfigJson {
- private boolean result;
- private List<DataItem> data;
- private int errCode;
+ private boolean success;
+ private String errMsg;
+ private AuditConfig data;
- public List<DataItem> getData() {
- return data;
+ public boolean isSuccess() {
+ return success;
}
- public int getErrCode() {
- return errCode;
+ public String getErrMsg() {
+ return errMsg;
}
- public static class DataItem {
-
- private String groupId;
- private String topic;
- private String m;
-
- public String getGroupId() {
- return groupId;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public String getM() {
- return m;
- }
+ public AuditConfig getData() {
+ return data;
}
}
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index c407d8716..732dae977 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -21,15 +21,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -52,9 +43,20 @@ import org.apache.flume.node.MaterializedConfiguration;
import org.apache.flume.node.PollingPropertiesFileConfigurationProvider;
import org.apache.flume.node.PropertiesFileConfigurationProvider;
import org.apache.flume.util.SSLUtil;
+import org.apache.inlong.audit.file.ConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
*
* Application
@@ -294,6 +296,8 @@ public class Application {
Application application;
+ ConfigManager.getInstance();
+
File configurationFile = new File(commandLine.getOptionValue('f'));
// The following is to ensure that by default the agent will fail on startup
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
index 1f357bec5..a4b67c9ed 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
@@ -28,7 +28,10 @@ import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -40,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
@@ -54,6 +58,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
// for kafka producer
private static Properties properties = new Properties();
+ private String kafkaServerUrl;
private static final String BOOTSTRAP_SERVER = "bootstrap_servers";
private static final String TOPIC = "topic";
private static final String RETRIES = "retries";
@@ -247,7 +252,6 @@ public class KafkaSink extends AbstractSink implements Configurable {
localIp = NetworkUtils.getLocalIp();
properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getString(BOOTSTRAP_SERVER));
properties.put(ProducerConfig.ACKS_CONFIG, defaultAcks);
properties.put(ProducerConfig.RETRIES_CONFIG, context.getString(RETRIES, defaultRetries));
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, context.getString(BATCH_SIZE, defaultBatchSize));
@@ -256,6 +260,15 @@ public class KafkaSink extends AbstractSink implements Configurable {
}
private void initTopicProducer(String topic) {
+ ConfigManager configManager = ConfigManager.getInstance();
+ List<MQInfo> mqInfoList = configManager.getMqInfoList();
+ mqInfoList.forEach(mqClusterInfo -> {
+ if (MQType.KAFKA.equals(mqClusterInfo.getMqType())) {
+ kafkaServerUrl = mqClusterInfo.getUrl();
+ }
+ });
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
+
if (StringUtils.isEmpty(topic)) {
logger.error("topic is empty");
}
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
index e374b5de2..30b388097 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -148,6 +148,8 @@ public class PulsarSink extends AbstractSink
private String topic;
+ private Context context;
+
static {
/*
* stat pulsar performance
@@ -168,6 +170,7 @@ public class PulsarSink extends AbstractSink
*/
public void configure(Context context) {
logger.info("PulsarSink started and context = {}", context.toString());
+ this.context = context;
/*
* topic config
*/
@@ -188,7 +191,6 @@ public class PulsarSink extends AbstractSink
if (diskIORatePerSec != 0) {
diskRateLimiter = RateLimiter.create(diskIORatePerSec);
}
- pulsarClientService = new PulsarClientService(context);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
@@ -208,6 +210,7 @@ public class PulsarSink extends AbstractSink
public void start() {
logger.info("pulsar sink starting");
sinkCounter.start();
+ pulsarClientService = new PulsarClientService(context);
pulsarClientService.initCreateConnection(this);
super.start();
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
index b5f22d1e5..c23e88e63 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
@@ -31,7 +31,10 @@ import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.consts.ConfigConstants;
+import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -46,6 +49,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -256,7 +260,6 @@ public class TubeSink extends AbstractSink implements Configurable {
topic = context.getString(TOPIC);
Preconditions.checkState(StringUtils.isNotEmpty(topic), "No topic specified");
- masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
Preconditions.checkState(masterHostAndPortList != null,
"No master and port list specified");
@@ -363,7 +366,7 @@ public class TubeSink extends AbstractSink implements Configurable {
}
try {
- TubeClientConfig conf = initTubeConfig(masterHostAndPortList);
+ TubeClientConfig conf = initTubeConfig();
sessionFactory = new TubeMultiSessionFactory(conf);
logger.info("create tube connection successfully");
} catch (TubeClientException e) {
@@ -383,7 +386,15 @@ public class TubeSink extends AbstractSink implements Configurable {
}
}
- private TubeClientConfig initTubeConfig(String masterHostAndPortList) throws Exception {
+ private TubeClientConfig initTubeConfig() throws Exception {
+ ConfigManager configManager = ConfigManager.getInstance();
+ List<MQInfo> mqInfoList = configManager.getMqInfoList();
+ mqInfoList.forEach(mqClusterInfo -> {
+ if (MQType.TUBEMQ.equals(mqClusterInfo.getMqType())) {
+ masterHostAndPortList = mqClusterInfo.getUrl();
+ }
+ });
+
final TubeClientConfig tubeClientConfig = new TubeClientConfig(masterHostAndPortList);
tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
index 9d95b36e9..ab1f07062 100644
--- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
+++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
@@ -23,8 +23,11 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.inlong.audit.consts.AttributeConstants;
+import org.apache.inlong.audit.file.ConfigManager;
import org.apache.inlong.audit.sink.EventStat;
import org.apache.inlong.audit.utils.LogCounter;
+import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -36,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -94,8 +98,13 @@ public class PulsarClientService {
* @param context
*/
public PulsarClientService(Context context) {
-
- pulsarServerUrl = context.getString(PULSAR_SERVER_URL);
+ ConfigManager configManager = ConfigManager.getInstance();
+ List<MQInfo> mqInfoList = configManager.getMqInfoList();
+ mqInfoList.forEach(mqClusterInfo -> {
+ if (MQType.PULSAR.equals(mqClusterInfo.getMqType())) {
+ pulsarServerUrl = mqClusterInfo.getUrl();
+ }
+ });
Preconditions.checkState(pulsarServerUrl != null, "No pulsar server url specified");
sendTimeout = context.getInteger(SEND_TIMEOUT, DEFAULT_SEND_TIMEOUT_MILL);
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java
index eed28a208..4f451f87f 100644
--- a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/KafkaSinkTest.java
@@ -49,7 +49,6 @@ public class KafkaSinkTest {
context = new Context();
context.put("topic", "inlong-audit");
- context.put("master-host-port-list", "127.0.0.1:8080");
kafkaSink.setChannel(channel);
Configurables.configure(kafkaSink, context);
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
index f0914c0e1..320c55e35 100644
--- a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
@@ -21,7 +21,6 @@ import com.google.common.base.Charsets;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
@@ -32,6 +31,7 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,16 +47,19 @@ public class PulsarSinkTest {
private PulsarSink sink;
private Channel channel;
+ private Context context;
@Before
- public void setUp() {
- sink = new PulsarSink();
+ public void setUp() throws Exception {
+ sink = PowerMockito.mock(PulsarSink.class);
+ PowerMockito.doNothing().when(sink, "start");
+ PowerMockito.when(sink.process()).thenReturn(Sink.Status.READY);
+ PowerMockito.when(sink.getLifecycleState()).thenReturn(LifecycleState.ERROR);
channel = new MemoryChannel();
- Context context = new Context();
+ context = new Context();
context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
- context.put("pulsar_server_url", "pulsar://127.0.0.1:6650");
sink.setChannel(channel);
@@ -65,7 +68,7 @@ public class PulsarSinkTest {
}
@Test
- public void testProcess() throws InterruptedException, EventDeliveryException {
+ public void testProcess() throws Exception {
setUp();
Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
sink.start();
diff --git a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
index cb309a2cb..650a26dc8 100644
--- a/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
+++ b/inlong-audit/audit-proxy/src/test/java/org/apache/inlong/audit/sink/TubeSinkTest.java
@@ -50,7 +50,6 @@ public class TubeSinkTest {
context = new Context();
context.put("topic", "inlong-audit");
- context.put("master-host-port-list", "127.0.0.1:8080");
tubeSink.setChannel(channel);
Configurables.configure(tubeSink, context);
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index 5fbe0f6ad..fe722a4a9 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -17,22 +17,39 @@
package org.apache.inlong.audit.service;
+import com.google.gson.Gson;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
import org.apache.inlong.audit.config.ClickHouseConfig;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
+import org.apache.inlong.audit.consts.ConfigConstants;
import org.apache.inlong.audit.db.dao.AuditDataDao;
+import org.apache.inlong.audit.file.RemoteConfigJson;
import org.apache.inlong.audit.service.consume.BaseConsume;
import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
+import org.apache.inlong.common.pojo.audit.MQInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
@Service
public class AuditMsgConsumerServer implements InitializingBean {
@@ -51,21 +68,37 @@ public class AuditMsgConsumerServer implements InitializingBean {
// ClickHouseService
private ClickHouseService ckService;
+ private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties";
+
+ private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+
+ private final Gson gson = new Gson();
+
/**
* Initializing bean
*/
public void afterPropertiesSet() {
+ List<MQInfo> mqInfoList = getClusterFromManager();
BaseConsume mqConsume = null;
List<InsertData> insertServiceList = this.getInsertServiceList();
- if (mqConfig.isPulsar()) {
- mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
- } else if (mqConfig.isTube()) {
- mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
- } else if (mqConfig.isKafka()) {
- mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
- } else {
- LOG.error("unknown MessageQueue {}", mqConfig.getMqType());
- return;
+
+ for (MQInfo mqInfo : mqInfoList) {
+ if (mqConfig.isPulsar()) {
+ mqConfig.setPulsarServerUrl(mqInfo.getUrl());
+ mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
+ break;
+ } else if (mqConfig.isTube()) {
+ mqConfig.setTubeMasterList(mqInfo.getUrl());
+ mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
+ break;
+ } else if (mqConfig.isKafka()) {
+ mqConfig.setKafkaServerUrl(mqInfo.getUrl());
+ mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
+ break;
+ } else {
+ LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
+ return;
+ }
}
if (storeConfig.isElasticsearchStore()) {
@@ -79,6 +112,7 @@ public class AuditMsgConsumerServer implements InitializingBean {
/**
* getInsertServiceList
+ *
* @return
*/
private List<InsertData> getInsertServiceList() {
@@ -96,4 +130,61 @@ public class AuditMsgConsumerServer implements InitializingBean {
}
return insertServiceList;
}
+
+ private List<MQInfo> getClusterFromManager() {
+ Properties properties = new Properties();
+ try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) {
+ properties.load(inputStream);
+ String managerHosts = properties.getProperty("manager.hosts");
+ String clusterTag = properties.getProperty("proxy.cluster.tag");
+ String[] hostList = StringUtils.split(managerHosts, ",");
+ for (String host : hostList) {
+ List<MQInfo> mqConfig = getMQConfig(host, clusterTag);
+ if (ObjectUtils.isNotEmpty(mqConfig)) {
+ LOG.info("return mqConfig");
+ return mqConfig;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ private List<MQInfo> getMQConfig(String host, String clusterTag) {
+ HttpPost httpPost = null;
+ try {
+ String url = "http://" + host + ConfigConstants.MANAGER_PATH + ConfigConstants.MANAGER_GET_CONFIG_PATH;
+ LOG.info("start to request {} to get config info", url);
+ httpPost = new HttpPost(url);
+ httpPost.addHeader(HttpHeaders.CONNECTION, "close");
+
+ // request body
+ AuditConfigRequest request = new AuditConfigRequest();
+ request.setClusterTag(clusterTag);
+ StringEntity stringEntity = new StringEntity(gson.toJson(request));
+ stringEntity.setContentType("application/json");
+ httpPost.setEntity(stringEntity);
+
+ // request with post
+ LOG.info("start to request {} to get config info with params {}", url, request);
+ CloseableHttpResponse response = httpClient.execute(httpPost);
+ String returnStr = EntityUtils.toString(response.getEntity());
+ RemoteConfigJson configJson = gson.fromJson(returnStr, RemoteConfigJson.class);
+ if (configJson.isSuccess() && configJson.getData() != null) {
+ List<MQInfo> mqInfoList = configJson.getData().getMqInfoList();
+ if (mqInfoList != null && !mqInfoList.isEmpty()) {
+ return mqInfoList;
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("Failed to get MQ config from manager, please check it", ex);
+ return null;
+ } finally {
+ if (httpPost != null) {
+ httpPost.releaseConnection();
+ }
+ }
+ return null;
+ }
}
diff --git a/inlong-audit/conf/application.properties b/inlong-audit/conf/application.properties
index b2027f73d..de305bd11 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -48,20 +48,21 @@ audit.config.proxy.type=pulsar
# store.server: mysql / clickhouse / elasticsearch
audit.config.store.mode=mysql
+# manger config
+manager.hosts=127.0.0.1:8083
+proxy.cluster.tag=default_cluster
+
# pulsar config
-audit.pulsar.server.url=pulsar://127.0.0.1:6650
audit.pulsar.topic=persistent://public/default/inlong-audit
audit.pulsar.consumer.sub.name=inlong-audit-subscription
audit.pulsar.token=
audit.pulsar.enable.auth=false
# tube config
-audit.tube.masterlist=127.0.0.1:8715
audit.tube.topic=inlong-audit
audit.tube.consumer.group.name=inlong-audit-consumer
# kafka config
-audit.kafka.server.url=127.0.0.1:9092
audit.kafka.topic=inlong-audit
audit.kafka.consumer.name=inlong-audit-consumer
audit.kafka.group.id=audit-consumer-group
diff --git a/inlong-audit/conf/audit-proxy-kafka.conf b/inlong-audit/conf/audit-proxy-kafka.conf
index 7263b7427..df14a6e6d 100644
--- a/inlong-audit/conf/audit-proxy-kafka.conf
+++ b/inlong-audit/conf/audit-proxy-kafka.conf
@@ -58,7 +58,6 @@ agent1.channels.ch-msg2.fsyncInterval = 10
agent1.sinks.kafka-sink-msg1.channel = ch-msg1
agent1.sinks.kafka-sink-msg1.type = org.apache.inlong.audit.sink.KafkaSink
-agent1.sinks.kafka-sink-msg1.bootstrap_servers = localhost:9092
agent1.sinks.kafka-sink-msg1.topic = inlong-audit
agent1.sinks.kafka-sink-msg1.retries = 0
agent1.sinks.kafka-sink-msg1.batch_size = 16384
@@ -67,7 +66,6 @@ agent1.sinks.kafka-sink-msg1.buffer_memory = 33554432
agent1.sinks.kafka-sink-msg2.channel = ch-msg1
agent1.sinks.kafka-sink-msg2.type = org.apache.inlong.audit.sink.KafkaSink
-agent1.sinks.kafka-sink-msg2.bootstrap_servers = localhost:9092
agent1.sinks.kafka-sink-msg2.topic = inlong-audit
agent1.sinks.kafka-sink-msg2.retries = 0
agent1.sinks.kafka-sink-msg2.batch_size = 16384
diff --git a/inlong-audit/conf/audit-proxy-pulsar.conf b/inlong-audit/conf/audit-proxy-pulsar.conf
index a64e73aab..633faff83 100644
--- a/inlong-audit/conf/audit-proxy-pulsar.conf
+++ b/inlong-audit/conf/audit-proxy-pulsar.conf
@@ -59,7 +59,6 @@ agent1.channels.ch-msg2.fsyncInterval = 10
agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.audit.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg1.pulsar_server_url = pulsar://localhost:6650
agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/inlong-audit
agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
agent1.sinks.pulsar-sink-msg1.auth_token =
@@ -77,7 +76,6 @@ agent1.sinks.pulsar-sink-msg1.disk_io_rate_per_sec= 20000000
agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.audit.sink.PulsarSink
-agent1.sinks.pulsar-sink-msg2.pulsar_server_url = pulsar://localhost:6650
agent1.sinks.pulsar-sink-msg2.topic = persistent://public/default/inlong-audit
agent1.sinks.pulsar-sink-msg1.enable_token_auth = false
agent1.sinks.pulsar-sink-msg1.auth_token =
diff --git a/inlong-audit/conf/audit-proxy-tubemq.conf b/inlong-audit/conf/audit-proxy-tubemq.conf
index 69bb76b12..d4b9cc95f 100644
--- a/inlong-audit/conf/audit-proxy-tubemq.conf
+++ b/inlong-audit/conf/audit-proxy-tubemq.conf
@@ -61,7 +61,6 @@ agent1.channels.ch-msg2.fsyncInterval = 10
agent1.sinks.tube-sink-msg1.channel = ch-msg1
agent1.sinks.tube-sink-msg1.type = org.apache.inlong.audit.sink.TubeSink
-agent1.sinks.tube-sink-msg1.master-host-port-list = localhost:8715
agent1.sinks.tube-sink-msg1.topic = inlong-audit
agent1.sinks.tube-sink-msg1.send_timeout = 30000
agent1.sinks.tube-sink-msg1.stat-interval-sec = 60
@@ -75,7 +74,6 @@ agent1.sinks.tube-sink-msg1.set=10
agent1.sinks.tube-sink-msg2.channel = ch-msg2
agent1.sinks.tube-sink-msg2.type = org.apache.inlong.audit.sink.TubeSink
-agent1.sinks.tube-sink-msg2.master-host-port-list = localhost:8715
agent1.sinks.tube-sink-msg2.topic = inlong-audit
agent1.sinks.tube-sink-msg2.send_timeout = 30000
agent1.sinks.tube-sink-msg2.stat-interval-sec = 60
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfig.java
similarity index 56%
copy from inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfig.java
index 9172ff736..b42d7088b 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfig.java
@@ -15,40 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.file;
+package org.apache.inlong.common.pojo.audit;
-import java.util.List;
-
-public class RemoteConfigJson {
-
- private boolean result;
- private List<DataItem> data;
- private int errCode;
-
- public List<DataItem> getData() {
- return data;
- }
-
- public int getErrCode() {
- return errCode;
- }
+import lombok.Data;
- public static class DataItem {
-
- private String groupId;
- private String topic;
- private String m;
-
- public String getGroupId() {
- return groupId;
- }
+import java.util.ArrayList;
+import java.util.List;
- public String getTopic() {
- return topic;
- }
+/**
+ * Audit config, includes MQ server URL and other params.
+ */
+@Data
+public class AuditConfig {
- public String getM() {
- return m;
- }
- }
+ private List<MQInfo> mqInfoList = new ArrayList<>();
}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfigRequest.java
similarity index 55%
copy from inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfigRequest.java
index 9172ff736..30851fbe5 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/AuditConfigRequest.java
@@ -15,40 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.file;
+package org.apache.inlong.common.pojo.audit;
-import java.util.List;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-public class RemoteConfigJson {
-
- private boolean result;
- private List<DataItem> data;
- private int errCode;
-
- public List<DataItem> getData() {
- return data;
- }
-
- public int getErrCode() {
- return errCode;
- }
-
- public static class DataItem {
-
- private String groupId;
- private String topic;
- private String m;
-
- public String getGroupId() {
- return groupId;
- }
-
- public String getTopic() {
- return topic;
- }
+/**
+ * Audit MQ config request info.
+ */
+@Data
+@NoArgsConstructor
+public class AuditConfigRequest {
- public String getM() {
- return m;
- }
- }
+ private String clusterTag;
}
diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/MQInfo.java
similarity index 55%
copy from inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/MQInfo.java
index 9172ff736..9487663f8 100644
--- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/RemoteConfigJson.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/audit/MQInfo.java
@@ -15,40 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.file;
+package org.apache.inlong.common.pojo.audit;
-import java.util.List;
+import lombok.Data;
-public class RemoteConfigJson {
+import java.util.HashMap;
+import java.util.Map;
- private boolean result;
- private List<DataItem> data;
- private int errCode;
-
- public List<DataItem> getData() {
- return data;
- }
-
- public int getErrCode() {
- return errCode;
- }
-
- public static class DataItem {
-
- private String groupId;
- private String topic;
- private String m;
-
- public String getGroupId() {
- return groupId;
- }
-
- public String getTopic() {
- return topic;
- }
+/**
+ * MQ info
+ */
+@Data
+public class MQInfo {
- public String getM() {
- return m;
- }
- }
+ private String url;
+ private String mqType;
+ private Map<String, String> params = new HashMap<>();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index fac486088..e7c258350 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.cluster;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
@@ -419,6 +420,14 @@ public interface InlongClusterService {
*/
String getAllConfig(String clusterName, String md5);
+ /**
+ * Get the MQ info by cluster tag for Audit
+ *
+ * @param clusterTag cluster tag
+ * @return MQ info
+ */
+ AuditConfig getAuditConfig(String clusterTag);
+
/**
* Test whether the connection can be successfully established.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 7d04689f5..7d9b05709 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -26,6 +26,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
+import org.apache.inlong.common.pojo.audit.MQInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
@@ -1329,6 +1331,27 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return configJson;
}
+ @Override
+ public AuditConfig getAuditConfig(String clusterTag) {
+ AuditConfig auditConfig = new AuditConfig();
+ ClusterPageRequest request = ClusterPageRequest.builder()
+ .clusterTag(clusterTag)
+ .typeList(Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR, ClusterType.KAFKA))
+ .build();
+ List<InlongClusterEntity> clusterEntityList = clusterMapper.selectByCondition(request);
+ LOGGER.info("clusterEntityList {}", clusterEntityList);
+ List<MQInfo> mqInfoList = new ArrayList<>();
+ for (InlongClusterEntity entity : clusterEntityList) {
+ MQInfo info = new MQInfo();
+ info.setUrl(entity.getUrl());
+ info.setMqType(entity.getType());
+ info.setParams(GSON.fromJson(entity.getExtParams(), Map.class));
+ mqInfoList.add(info);
+ }
+ auditConfig.setMqInfoList(mqInfoList);
+ return auditConfig;
+ }
+
@Override
public Boolean testConnection(ClusterRequest request) {
LOGGER.info("begin test connection for: {}", request);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java
new file mode 100644
index 000000000..f865394f2
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AuditController.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.pojo.audit.AuditConfig;
+import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Audit controller.
+ */
+@RestController("OpenAuditController")
+@RequestMapping("/openapi")
+@Api(tags = "Open-Audit-API")
+public class AuditController {
+
+ @Autowired
+ private InlongClusterService clusterService;
+
+ @PostMapping("/audit/getConfig")
+ @ApiOperation(value = "Get mq config list")
+ public Response<AuditConfig> getConfig(@RequestBody AuditConfigRequest request) {
+ AuditConfig auditConfig = clusterService.getAuditConfig(request.getClusterTag());
+ if (CollectionUtils.isEmpty(auditConfig.getMqInfoList())) {
+ return Response.fail("Failed to get MQ config of cluster tag: " + request.getClusterTag());
+ }
+ return Response.success(auditConfig);
+ }
+}