You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/20 06:01:30 UTC

[inlong] branch master updated: [INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bb1b6c31 [INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)
9bb1b6c31 is described below

commit 9bb1b6c319b329f186644c84c0a54afa0dac7ded
Author: haifxu <xh...@gmail.com>
AuthorDate: Mon Mar 20 14:01:24 2023 +0800

    [INLONG-7646][Audit] Fix NPE when mq configuration is not registered (#7647)
---
 .../audit/service/AuditMsgConsumerServer.java      | 32 ++++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index fe722a4a9..c23c23172 100644
--- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -37,6 +37,7 @@ import org.apache.inlong.audit.service.consume.BaseConsume;
 import org.apache.inlong.audit.service.consume.KafkaConsume;
 import org.apache.inlong.audit.service.consume.PulsarConsume;
 import org.apache.inlong.audit.service.consume.TubeConsume;
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.pojo.audit.AuditConfigRequest;
 import org.apache.inlong.common.pojo.audit.MQInfo;
 import org.slf4j.Logger;
@@ -45,7 +46,6 @@ import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -70,6 +70,9 @@ public class AuditMsgConsumerServer implements InitializingBean {
 
     private static final String DEFAULT_CONFIG_PROPERTIES = "application.properties";
 
+    // interval time of getting mq config
+    private static final int INTERVAL_MS = 5000;
+
     private final CloseableHttpClient httpClient = HttpClientBuilder.create().build();
 
     private final Gson gson = new Gson();
@@ -83,24 +86,25 @@ public class AuditMsgConsumerServer implements InitializingBean {
         List<InsertData> insertServiceList = this.getInsertServiceList();
 
         for (MQInfo mqInfo : mqInfoList) {
-            if (mqConfig.isPulsar()) {
+            if (mqConfig.isPulsar() && MQType.PULSAR.equals(mqInfo.getMqType())) {
                 mqConfig.setPulsarServerUrl(mqInfo.getUrl());
                 mqConsume = new PulsarConsume(insertServiceList, storeConfig, mqConfig);
                 break;
-            } else if (mqConfig.isTube()) {
+            } else if (mqConfig.isTube() && MQType.TUBEMQ.equals(mqInfo.getMqType())) {
                 mqConfig.setTubeMasterList(mqInfo.getUrl());
                 mqConsume = new TubeConsume(insertServiceList, storeConfig, mqConfig);
                 break;
-            } else if (mqConfig.isKafka()) {
+            } else if (mqConfig.isKafka() && MQType.KAFKA.equals(mqInfo.getMqType())) {
                 mqConfig.setKafkaServerUrl(mqInfo.getUrl());
                 mqConsume = new KafkaConsume(insertServiceList, storeConfig, mqConfig);
                 break;
-            } else {
-                LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
-                return;
             }
         }
 
+        if (mqConsume == null) {
+            LOG.error("Unknown MessageQueue {}", mqConfig.getMqType());
+        }
+
         if (storeConfig.isElasticsearchStore()) {
             esService.startTimerRoutine();
         }
@@ -133,19 +137,23 @@ public class AuditMsgConsumerServer implements InitializingBean {
 
     private List<MQInfo> getClusterFromManager() {
         Properties properties = new Properties();
+        List<MQInfo> mqConfig;
         try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_PROPERTIES)) {
             properties.load(inputStream);
             String managerHosts = properties.getProperty("manager.hosts");
             String clusterTag = properties.getProperty("proxy.cluster.tag");
             String[] hostList = StringUtils.split(managerHosts, ",");
             for (String host : hostList) {
-                List<MQInfo> mqConfig = getMQConfig(host, clusterTag);
-                if (ObjectUtils.isNotEmpty(mqConfig)) {
-                    LOG.info("return mqConfig");
-                    return mqConfig;
+                while (true) {
+                    mqConfig = getMQConfig(host, clusterTag);
+                    if (ObjectUtils.isNotEmpty(mqConfig)) {
+                        return mqConfig;
+                    }
+                    LOG.info("MQ config may not be registered yet, wait for 5s and try again");
+                    Thread.sleep(INTERVAL_MS);
                 }
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
         return null;