You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/27 08:21:10 UTC
[incubator-inlong] branch master updated: [INLONG-2751][Manager] Fix the response code while query failing (#2767)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 96583b4 [INLONG-2751][Manager] Fix the response code while query failing (#2767)
96583b4 is described below
commit 96583b4c0d9a94a8f0d50b81bd6a991e99e80e50
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Sun Feb 27 16:21:02 2022 +0800
[INLONG-2751][Manager] Fix the response code while query failing (#2767)
---
.../inlong/dataproxy/config/ConfigManager.java | 18 ++++++++------
.../inlong/dataproxy/config/RemoteConfigJson.java | 28 ++++++++--------------
.../thirdparty/mq/CreateTubeGroupTaskListener.java | 10 ++++----
.../controller/openapi/DataProxyController.java | 8 +++++--
4 files changed, 32 insertions(+), 32 deletions(-)
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index 5f79582..0cf74d7 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -110,13 +110,13 @@ public class ConfigManager {
/**
* update old maps, reload local files if changed.
*
- * @param result - map pending to be added
- * @param holder - property holder
+ * @param result - map pending to be added
+ * @param holder - property holder
* @param addElseRemove - if add(true) else remove(false)
* @return true if changed else false.
*/
private boolean updatePropertiesHolder(Map<String, String> result,
- PropertiesConfigHolder holder, boolean addElseRemove) {
+ PropertiesConfigHolder holder, boolean addElseRemove) {
Map<String, String> tmpHolder = holder.forkHolder();
boolean changed = false;
for (Map.Entry<String, String> entry : result.entrySet()) {
@@ -272,7 +272,8 @@ public class ConfigManager {
if (StringUtils.isEmpty(proxyClusterName)) {
LOG.error("proxyClusterName is null");
}
- String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName=" + proxyClusterName;
+ String url = "http://" + host + "/api/inlong/manager/openapi/dataproxy/getConfig_v2?clusterName="
+ + proxyClusterName;
LOG.info("start to request {} to get config info", url);
httpGet = new HttpGet(url);
httpGet.addHeader(HttpHeaders.CONNECTION, "close");
@@ -287,14 +288,15 @@ public class ConfigManager {
Map<String, String> groupIdToMValue = new HashMap<String, String>();
Map<String, String> mqConfig = new HashMap<>();// include url2token and other params
- if (configJson.getErrCode() == 0) {
+ if (configJson.isSuccess() && configJson.getData() != null) { //success get config
+ LOG.info("getConfig_v2 result: {}", returnStr);
/*
* get mqUrls <->token maps;
* if mq is pulsar, store format: third-party-cluster.index1=cluster1url1,cluster1url2=token
* if mq is tubemq, token is "", store format: third-party-cluster.index1=cluster1url1,cluster1url2=
*/
int index = 1;
- List<ThirdPartyClusterInfo> clusterSet = configJson.getPulsarSet();
+ List<ThirdPartyClusterInfo> clusterSet = configJson.getData().getMqSet();
for (ThirdPartyClusterInfo mqCluster : clusterSet) {
String key = ThirdPartyClusterConfigHolder.URL_STORE_PREFIX + index;
String value = mqCluster.getUrl() + AttributeConstants.KEY_VALUE_SEPARATOR
@@ -305,7 +307,7 @@ public class ConfigManager {
// mq other params
mqConfig.putAll(clusterSet.get(0).getParams());
- for (DataProxyConfig topic : configJson.getTopicList()) {
+ for (DataProxyConfig topic : configJson.getData().getTopicList()) {
groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
}
@@ -317,6 +319,8 @@ public class ConfigManager {
configManager.getThirdPartyClusterConfig().putAll(clusterSet.get(0).getParams());
configManager.getThirdPartyClusterHolder()
.setUrl2token(configManager.getThirdPartyClusterHolder().getUrl2token());
+ } else {
+ LOG.error("getConfig from manager: {}", configJson.getErrMsg());
}
} catch (Exception ex) {
LOG.error("exception caught", ex);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
index 4455489..7182d23 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigJson.java
@@ -17,32 +17,24 @@
package org.apache.inlong.dataproxy.config;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
-import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterInfo;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
public class RemoteConfigJson {
- private boolean result;
- private int errCode;
- private List<ThirdPartyClusterInfo> pulsarSet = new ArrayList<>();
- private List<DataProxyConfig> topicList = new ArrayList<>();
+ private boolean success;
+ private String errMsg;
+ private ThirdPartyClusterDTO data;
- public boolean isResult() {
- return result;
+ public boolean isSuccess() {
+ return success;
}
- public int getErrCode() {
- return errCode;
+ public String getErrMsg() {
+ return errMsg;
}
- public List<ThirdPartyClusterInfo> getPulsarSet() {
- return pulsarSet;
+ public ThirdPartyClusterDTO getData() {
+ return data;
}
- public List<DataProxyConfig> getTopicList() {
- return topicList;
- }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
index 9beb71b..4f4f71e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
@@ -19,19 +19,20 @@ package org.apache.inlong.manager.service.thirdparty.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ReTryConfigBean;
+import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Collections;
@@ -45,12 +46,10 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
@Autowired
TubeMqOptService tubeMqOptService;
-
- @Value("${cluster.tube.clusterId}")
- Integer clusterId;
-
@Autowired
ReTryConfigBean reTryConfigBean;
+ @Autowired
+ private CommonOperateService commonOperateService;
@Override
public TaskEvent event() {
@@ -65,6 +64,7 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
InlongGroupRequest groupInfo = groupService.get(groupId);
String topicName = groupInfo.getMqResourceObj();
+ int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_CLUSTER_ID));
QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
.topicName(topicName).clusterId(clusterId)
.user(groupInfo.getCreator()).build();
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
index b10619b..3c3c630 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DataProxyController.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.web.controller.openapi;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.ThirdPartyClusterDTO;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
-import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.manager.service.core.DataProxyClusterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
@@ -64,7 +64,11 @@ public class DataProxyController {
@GetMapping("/getConfig_v2")
@ApiOperation(value = "get dataproxy config list, including pulsar cluster config and topic")
public Response<ThirdPartyClusterDTO> getConfigV2(@RequestParam("clusterName") String clusterName) {
- return Response.success(dataProxyClusterService.getConfigV2(clusterName));
+ ThirdPartyClusterDTO dto = dataProxyClusterService.getConfigV2(clusterName);
+ if (dto.getMqSet().isEmpty() || dto.getTopicList().isEmpty()) {
+ return Response.fail("fail to get mq config or topics");
+ }
+ return Response.success(dto);
}
@GetMapping("/getAllConfig")