You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/20 06:02:57 UTC
[inlong] branch branch-1.6 updated: [INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new ed0b5d520 [INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)
ed0b5d520 is described below
commit ed0b5d520abc9cfe87501eb22f624eef9ae323e9
Author: haifxu <xh...@gmail.com>
AuthorDate: Mon Mar 20 14:01:24 2023 +0800
[INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)
---
.../audit/service/AuditMsgConsumerServer.java | 32 ++++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index fe722a4a9..c23c23172 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -37,6 +37,7 @@ import org.apache.inlong.audit.service.consume.BaseConsume;
import org.apache.inlong.audit.service.consume.KafkaConsume;
import org.apache.inlong.audit.service.consume.PulsarConsume;
import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
import org.apache.inlong.common.pojo.audit.MQInfo;
import org.slf4j.Logger;
@@ -45,7 +46,6 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@@ -70,6 +70,9 @@ public class AuditMsgConsumerServer implements InitializingBean {
private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties";
+ // interval time of getting mq config
+ private static final int INTERVAL_MS = 5000;
+
private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
private final Gson gson = new Gson();
@@ -83,24 +86,25 @@ public class AuditMsgConsumerServer implements InitializingBean {
List<InsertData> insertServiceList = this.getInsertServiceList();
for (MQInfo mqInfo : mqInfoList) {
- if (mqConfig.isPulsar()) {
+ if (mqConfig.isPulsar() && MQType.PULSAR.equals(mqInfo.getMqType())) {
mqConfig.setPulsarServerUrl(mqInfo.getUrl());
mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
break;
- } else if (mqConfig.isTube()) {
+ } else if (mqConfig.isTube() && MQType.TUBEMQ.equals(mqInfo.getMqType())) {
mqConfig.setTubeMasterList(mqInfo.getUrl());
mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
break;
- } else if (mqConfig.isKafka()) {
+ } else if (mqConfig.isKafka() && MQType.KAFKA.equals(mqInfo.getMqType())) {
mqConfig.setKafkaServerUrl(mqInfo.getUrl());
mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
break;
- } else {
- LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
- return;
}
}
+ if (mqConsume == null) {
+ LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
+ }
+
if (storeConfig.isElasticsearchStore()) {
esService.startTimerRoutine();
}
@@ -133,19 +137,23 @@ public class AuditMsgConsumerServer implements InitializingBean {
private List<MQInfo> getClusterFromManager() {
Properties properties = new Properties();
+ List<MQInfo> mqConfig;
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) {
properties.load(inputStream);
String managerHosts = properties.getProperty("manager.hosts");
String clusterTag = properties.getProperty("proxy.cluster.tag");
String[] hostList = StringUtils.split(managerHosts, ",");
for (String host : hostList) {
- List<MQInfo> mqConfig = getMQConfig(host, clusterTag);
- if (ObjectUtils.isNotEmpty(mqConfig)) {
- LOG.info("return mqConfig");
- return mqConfig;
+ while (true) {
+ mqConfig = getMQConfig(host, clusterTag);
+ if (ObjectUtils.isNotEmpty(mqConfig)) {
+ return mqConfig;
+ }
+ LOG.info("MQ config may not be registered yet, wait for 5s and try again");
+ Thread.sleep(INTERVAL_MS);
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
return null;