You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/16 07:45:41 UTC
[incubator-inlong] branch master updated: [INLONG-2009][InLong-Manager] Topic obtained through "openapi/dataproxy/getConfig" is not right (#2010)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6566ab8 [INLONG-2009][InLong-Manager] Topic obtained through "openapi/dataproxy/getConfig" is not right (#2010)
6566ab8 is described below
commit 6566ab866a4bf533d10e336a794ade22773bca94
Author: healchow <he...@gmail.com>
AuthorDate: Thu Dec 16 15:42:32 2021 +0800
[INLONG-2009][InLong-Manager] Topic obtained through "openapi/dataproxy/getConfig" is not right (#2010)
---
.../manager/dao/mapper/BusinessEntityMapper.java | 2 +
.../resources/mappers/BusinessEntityMapper.xml | 8 +++
.../service/core/impl/BusinessServiceImpl.java | 7 ++-
.../core/impl/DataProxyClusterServiceImpl.java | 58 ++++++++++++++--------
4 files changed, 50 insertions(+), 25 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java
index 37d8434..50128f0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java
@@ -44,6 +44,8 @@ public interface BusinessEntityMapper {
List<BusinessEntity> selectByCondition(BusinessPageRequest request);
+ List<BusinessEntity> selectAll(Integer status);
+
/**
* get all config with business status of 130, that is, config successful
*/
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
index 2d7b41d..270abf3 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
@@ -82,6 +82,14 @@
where inlong_group_id = #{groupId, jdbcType=VARCHAR}
and is_deleted = 0
</select>
+ <select id="selectAll" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from business
+ where is_deleted = 0
+ and status = #{status, jdbcType=VARCHAR}
+ order by modify_time desc
+ </select>
<select id="selectByCondition" resultMap="BaseResultMap"
parameterType="org.apache.inlong.manager.common.pojo.business.BusinessPageRequest">
select
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 479e1b9..074c9df 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -81,9 +81,8 @@ public class BusinessServiceImpl implements BusinessService {
String bizName = businessInfo.getName();
Preconditions.checkNotNull(bizName, "business name is empty");
- String topic = bizName.toLowerCase(Locale.ROOT);
- // groupId=b_topic, cannot update
- String groupId = "b_" + topic;
+ // groupId=b_bizName, cannot update
+ String groupId = "b_" + bizName.toLowerCase(Locale.ROOT);
Integer count = businessMapper.selectIdentifierExist(groupId);
if (count >= 1) {
LOGGER.error("groupId [{}] has already exists", groupId);
@@ -93,7 +92,7 @@ public class BusinessServiceImpl implements BusinessService {
// Processing business and extended information
BusinessEntity entity = CommonBeanUtils.copyProperties(businessInfo, BusinessEntity::new);
entity.setInlongGroupId(groupId);
- entity.setMqResourceObj(topic);
+ entity.setMqResourceObj(groupId);
// Only M0 is currently supported
entity.setSchemaName(BizConstant.SCHEMA_M0_DAY);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 001ecbf..2709489 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -17,13 +17,18 @@
package org.apache.inlong.manager.service.core.impl;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import com.google.gson.Gson;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
-
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.commons.pojo.dataproxy.DataProxyConfigResponse;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -34,11 +39,13 @@ import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
import org.apache.inlong.manager.dao.entity.DataProxyConfig;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper;
-import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
import org.apache.inlong.manager.service.core.DataProxyClusterService;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.slf4j.Logger;
@@ -47,13 +54,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import com.github.pagehelper.PageInfo;
-import com.google.gson.Gson;
-
-import lombok.extern.slf4j.Slf4j;
-
/**
* DataProxy cluster service layer implementation class
*/
@@ -66,11 +66,13 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
@Autowired
private DataProxyClusterEntityMapper dataProxyClusterMapper;
@Autowired
- private SourceFileDetailEntityMapper sourceFileDetailMapper;
+ private BusinessEntityMapper businessMapper;
@Autowired
- private BusinessEntityMapper businessEntityMapper;
+ private DataStreamEntityMapper dataStreamMapper;
@Autowired
private DataProxyConfigRepository proxyRepository;
+ @Autowired
+ private ClusterBean clusterBean;
@Transactional(rollbackFor = Throwable.class)
@Override
@@ -211,9 +213,28 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
@Override
public List<DataProxyConfig> getConfig() {
// get all configs with business status of 130, that is, config successful
- List<DataProxyConfig> configList = businessEntityMapper.selectDataProxyConfig();
- if (configList == null) {
- configList = Collections.emptyList();
+ // TODO Optimize query conditions
+ List<BusinessEntity> bizEntityList = businessMapper.selectAll(EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode());
+ List<DataProxyConfig> configList = new ArrayList<>();
+ for (BusinessEntity entity : bizEntityList) {
+ String groupId = entity.getInlongGroupId();
+ String bizResource = entity.getMqResourceObj();
+
+ DataProxyConfig config = new DataProxyConfig();
+ config.setM(entity.getSchemaName());
+ if (BizConstant.MIDDLEWARE_TUBE.equals(entity.getMiddlewareType())) {
+ config.setInlongGroupId(groupId);
+ config.setTopic(bizResource);
+ } else if (BizConstant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
+ List<DataStreamEntity> streamList = dataStreamMapper.selectByGroupId(groupId);
+ for (DataStreamEntity stream : streamList) {
+ String streamId = stream.getInlongStreamId();
+ config.setInlongGroupId(groupId + "/" + streamId);
+ config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + groupId + "/" + streamId);
+
+ }
+ }
+ configList.add(config);
}
return configList;
@@ -221,10 +242,7 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
/**
* query data proxy config by cluster id
- *
- * @param clusterName
- * @param setName
- * @param md5
+ *
* @return data proxy config
*/
public String getAllConfig(String clusterName, String setName, String md5) {
@@ -245,8 +263,6 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
/**
* getErrorAllConfig
- *
- * @return
*/
private String getErrorAllConfig() {
DataProxyConfigResponse response = new DataProxyConfigResponse();