You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/16 07:45:41 UTC

[incubator-inlong] branch master updated: [INLONG-2009][InLong-Manager] Topic obtained through "openapi/dataproxy/getConfig" is not right (#2010)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6566ab8  [INLONG-2009][InLong-Manager] Topic obtained through "openapi/dataproxy/getConfig" is not right (#2010)
6566ab8 is described below

commit 6566ab866a4bf533d10e336a794ade22773bca94
Author: healchow <he...@gmail.com>
AuthorDate: Thu Dec 16 15:42:32 2021 +0800

    [INLONG-2009][InLong-Manager] Topic obtained through "openapi/dataproxy/getConfig" is not right (#2010)
---
 .../manager/dao/mapper/BusinessEntityMapper.java   |  2 +
 .../resources/mappers/BusinessEntityMapper.xml     |  8 +++
 .../service/core/impl/BusinessServiceImpl.java     |  7 ++-
 .../core/impl/DataProxyClusterServiceImpl.java     | 58 ++++++++++++++--------
 4 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java
index 37d8434..50128f0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessEntityMapper.java
@@ -44,6 +44,8 @@ public interface BusinessEntityMapper {
 
     List<BusinessEntity> selectByCondition(BusinessPageRequest request);
 
+    List<BusinessEntity> selectAll(Integer status);
+
     /**
      * get all config with business status of 130, that is, config successful
      */
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
index 2d7b41d..270abf3 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
@@ -82,6 +82,14 @@
         where inlong_group_id = #{groupId, jdbcType=VARCHAR}
           and is_deleted = 0
     </select>
+    <select id="selectAll" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from business
+        where is_deleted = 0
+        and status = #{status, jdbcType=VARCHAR}
+        order by modify_time desc
+    </select>
     <select id="selectByCondition" resultMap="BaseResultMap"
             parameterType="org.apache.inlong.manager.common.pojo.business.BusinessPageRequest">
         select
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 479e1b9..074c9df 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -81,9 +81,8 @@ public class BusinessServiceImpl implements BusinessService {
         String bizName = businessInfo.getName();
         Preconditions.checkNotNull(bizName, "business name is empty");
 
-        String topic = bizName.toLowerCase(Locale.ROOT);
-        // groupId=b_topic, cannot update
-        String groupId = "b_" + topic;
+        // groupId=b_bizName, cannot update
+        String groupId = "b_" + bizName.toLowerCase(Locale.ROOT);
         Integer count = businessMapper.selectIdentifierExist(groupId);
         if (count >= 1) {
             LOGGER.error("groupId [{}] has already exists", groupId);
@@ -93,7 +92,7 @@ public class BusinessServiceImpl implements BusinessService {
         // Processing business and extended information
         BusinessEntity entity = CommonBeanUtils.copyProperties(businessInfo, BusinessEntity::new);
         entity.setInlongGroupId(groupId);
-        entity.setMqResourceObj(topic);
+        entity.setMqResourceObj(groupId);
 
         // Only M0 is currently supported
         entity.setSchemaName(BizConstant.SCHEMA_M0_DAY);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
index 001ecbf..2709489 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataProxyClusterServiceImpl.java
@@ -17,13 +17,18 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import com.google.gson.Gson;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.commons.pojo.dataproxy.DataProxyConfigResponse;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -34,11 +39,13 @@ import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpRequest;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyIpResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
 import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
 import org.apache.inlong.manager.dao.entity.DataProxyConfig;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
 import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper;
-import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
 import org.apache.inlong.manager.service.core.DataProxyClusterService;
 import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
 import org.slf4j.Logger;
@@ -47,13 +54,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import com.github.pagehelper.PageInfo;
-import com.google.gson.Gson;
-
-import lombok.extern.slf4j.Slf4j;
-
 /**
  * DataProxy cluster service layer implementation class
  */
@@ -66,11 +66,13 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
     @Autowired
     private DataProxyClusterEntityMapper dataProxyClusterMapper;
     @Autowired
-    private SourceFileDetailEntityMapper sourceFileDetailMapper;
+    private BusinessEntityMapper businessMapper;
     @Autowired
-    private BusinessEntityMapper businessEntityMapper;
+    private DataStreamEntityMapper dataStreamMapper;
     @Autowired
     private DataProxyConfigRepository proxyRepository;
+    @Autowired
+    private ClusterBean clusterBean;
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
@@ -211,9 +213,28 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
     @Override
     public List<DataProxyConfig> getConfig() {
         // get all configs with business status of 130, that is, config successful
-        List<DataProxyConfig> configList = businessEntityMapper.selectDataProxyConfig();
-        if (configList == null) {
-            configList = Collections.emptyList();
+        // TODO Optimize query conditions
+        List<BusinessEntity> bizEntityList = businessMapper.selectAll(EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode());
+        List<DataProxyConfig> configList = new ArrayList<>();
+        for (BusinessEntity entity : bizEntityList) {
+            String groupId = entity.getInlongGroupId();
+            String bizResource = entity.getMqResourceObj();
+
+            DataProxyConfig config = new DataProxyConfig();
+            config.setM(entity.getSchemaName());
+            if (BizConstant.MIDDLEWARE_TUBE.equals(entity.getMiddlewareType())) {
+                config.setInlongGroupId(groupId);
+                config.setTopic(bizResource);
+            } else if (BizConstant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
+                List<DataStreamEntity> streamList = dataStreamMapper.selectByGroupId(groupId);
+                for (DataStreamEntity stream : streamList) {
+                    String streamId = stream.getInlongStreamId();
+                    config.setInlongGroupId(groupId + "/" + streamId);
+                    config.setTopic("persistent://" + clusterBean.getDefaultTenant() + "/" + groupId + "/" + streamId);
+
+                }
+            }
+            configList.add(config);
         }
 
         return configList;
@@ -221,10 +242,7 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
 
     /**
      * query data proxy config by cluster id
-     * 
-     * @param clusterName
-     * @param setName
-     * @param md5
+     *
      * @return data proxy config
      */
     public String getAllConfig(String clusterName, String setName, String md5) {
@@ -245,8 +263,6 @@ public class DataProxyClusterServiceImpl implements DataProxyClusterService {
 
     /**
      * getErrorAllConfig
-     * 
-     * @return
      */
     private String getErrorAllConfig() {
         DataProxyConfigResponse response = new DataProxyConfigResponse();