You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/27 08:21:10 UTC

[incubator-inlong] branch master updated: [INLONG-2751][Manager] Fix the response code while query failing (#2767)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 96583b4  [INLONG-2751][Manager] Fix the response code while query failing (#2767)
96583b4 is described below

commit 96583b4c0d9a94a8f0d50b81bd6a991e99e80e50
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Sun Feb 27 16:21:02 2022 +0800

    [INLONG-2751][Manager] Fix the response code while query failing (#2767)
---
 .../inlong/dataproxy/config/ConfigManager.java     | 18 ++++++++------
 .../inlong/dataproxy/config/RemoteConfigJson.java  | 28 ++++++++--------------
 .../thirdparty/mq/CreateTubeGroupTaskListener.java | 10 ++++----
 .../controller/openapi/DataProxyController.java    |  8 +++++--
 4 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 5f79582..0cf74d7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -110,13 +110,13 @@ public class ConfigManager {
     /**
      * update old maps, reload local files if changed.
      *
-     * @param result        - map pending to be added
-     * @param holder        - property holder
+     * @param result - map pending to be added
+     * @param holder - property holder
      * @param addElseRemove - if add(true) else remove(false)
      * @return true if changed else false.
      */
     private boolean updatePropertiesHolder(Map<String, String> result,
-                                           PropertiesConfigHolder holder, boolean addElseRemove) {
+            PropertiesConfigHolder holder, boolean addElseRemove) {
         Map<String, String> tmpHolder = holder.forkHolder();
         boolean changed = false;
         for (Map.Entry<String, String> entry : result.entrySet()) {
@@ -272,7 +272,8 @@ public class ConfigManager {
                 if (StringUtils.isEmpty(proxyClusterName)) {
                     LOG.error("proxyClusterName is null");
                 }
-                String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName=" + proxyClusterName;
+                String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName="
+                        + proxyClusterName;
                 LOG.info("start to request {} to get config info", url);
                 httpGet = new HttpGet(url);
                 httpGet.addHeader(HttpHeaders.CONNECTION, "close");
@@ -287,14 +288,15 @@ public class ConfigManager {
                 Map<String, String> groupIdToMValue = new HashMap<String, String>();
                 Map<String, String> mqConfig = new HashMap<>();// include url2token and other params
 
-                if (configJson.getErrCode() == 0) {
+                if (configJson.isSuccess() && configJson.getData() != null) { //success get config
+                    LOG.info("getConfig_v2 result: {}", returnStr);
                     /*
                      * get mqUrls <->token maps;
                      * if mq is pulsar, store format: third-party-cluster.index1=cluster1url1,cluster1url2=token
                      * if mq is tubemq, token is "", store format: third-party-cluster.index1=cluster1url1,cluster1url2=
                      */
                     int index = 1;
-                    List<ThirdPartyClusterInfo> clusterSet = configJson.getPulsarSet();
+                    List<ThirdPartyClusterInfo> clusterSet = configJson.getData().getMqSet();
                     for (ThirdPartyClusterInfo mqCluster : clusterSet) {
                         String key = ThirdPartyClusterConfigHolder.URL_STORE_PREFIX + index;
                         String value = mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR
@@ -305,7 +307,7 @@ public class ConfigManager {
                     // mq other params
                     mqConfig.putAll(clusterSet.get(0).getParams());
 
-                    for (DataProxyConfig topic : configJson.getTopicList()) {
+                    for (DataProxyConfig topic : configJson.getData().getTopicList()) {
                         groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
                         groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
                     }
@@ -317,6 +319,8 @@ public class ConfigManager {
                     configManager.getThirdPartyClusterConfig().putAll(clusterSet.get(0).getParams());
                     configManager.getThirdPartyClusterHolder()
                             .setUrl2token(configManager.getThirdPartyClusterHolder().getUrl2token());
+                } else {
+                    LOG.error("getConfig from manager: {}", configJson.getErrMsg());
                 }
             } catch (Exception ex) {
                 LOG.error("exception caught", ex);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
index 4455489..7182d23 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
@@ -17,32 +17,24 @@
 
 package org.apache.inlong.dataproxy.config;
 
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
 
 public class RemoteConfigJson {
 
-    private boolean result;
-    private int errCode;
-    private List<ThirdPartyClusterInfo> pulsarSet = new ArrayList<>();
-    private List<DataProxyConfig> topicList = new ArrayList<>();
+    private boolean success;
+    private String errMsg;
+    private ThirdPartyClusterDTO data;
 
-    public boolean isResult() {
-        return result;
+    public boolean isSuccess() {
+        return success;
     }
 
-    public int getErrCode() {
-        return errCode;
+    public String getErrMsg() {
+        return errMsg;
     }
 
-    public List<ThirdPartyClusterInfo> getPulsarSet() {
-        return pulsarSet;
+    public ThirdPartyClusterDTO getData() {
+        return data;
     }
 
-    public List<DataProxyConfig> getTopicList() {
-        return topicList;
-    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
index 9beb71b..4f4f71e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
@@ -19,19 +19,20 @@ package org.apache.inlong.manager.service.thirdparty.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ReTryConfigBean;
+import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
 import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.util.Collections;
@@ -45,12 +46,10 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
 
     @Autowired
     TubeMqOptService tubeMqOptService;
-
-    @Value("${cluster.tube.clusterId}")
-    Integer clusterId;
-
     @Autowired
     ReTryConfigBean reTryConfigBean;
+    @Autowired
+    private CommonOperateService commonOperateService;
 
     @Override
     public TaskEvent event() {
@@ -65,6 +64,7 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
 
         InlongGroupRequest groupInfo = groupService.get(groupId);
         String topicName = groupInfo.getMqResourceObj();
+        int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_CLUSTER_ID));
         QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
                 .topicName(topicName).clusterId(clusterId)
                 .user(groupInfo.getCreator()).build();
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index b10619b..3c3c630 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.web.controller.openapi;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.manager.service.core.DataProxyClusterService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -64,7 +64,11 @@ public class DataProxyController {
     @GetMapping("/getConfig_v2")
     @ApiOperation(value = "get dataproxy config list, including pulsar cluster config and topic")
     public Response<ThirdPartyClusterDTO> getConfigV2(@RequestParam("clusterName") String clusterName) {
-        return Response.success(dataProxyClusterService.getConfigV2(clusterName));
+        ThirdPartyClusterDTO dto = dataProxyClusterService.getConfigV2(clusterName);
+        if (dto.getMqSet().isEmpty() || dto.getTopicList().isEmpty()) {
+            return Response.fail("fail to get mq config or topics");
+        }
+        return Response.success(dto);
     }
 
     @GetMapping("/getAllConfig")