You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/11/24 11:52:27 UTC

[inlong] branch master updated: [INLONG-6610][Manager] Support the saving and query of data report-to settings (#6614)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new be448d672 [INLONG-6610][Manager] Support the saving and query of data report-to settings (#6614)
be448d672 is described below

commit be448d672e42b671bb29518949e5fccd0457a06f
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Nov 24 19:52:21 2022 +0800

    [INLONG-6610][Manager] Support the saving and query of data report-to settings (#6614)
    
    
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../inlong/common/pojo/agent/DataConfig.java       | 27 ++++++++
 .../manager/common/consts/InlongConstants.java     | 12 ++++
 .../manager/dao/entity/InlongGroupEntity.java      |  1 +
 .../resources/mappers/InlongGroupEntityMapper.xml  | 33 +++++----
 .../pojo/group/InlongGroupApproveRequest.java      |  7 ++
 .../inlong/manager/pojo/group/InlongGroupInfo.java |  9 ++-
 .../manager/pojo/group/InlongGroupRequest.java     | 21 ++++--
 .../service/core/impl/AgentServiceImpl.java        | 81 ++++++++++++++++++----
 .../service/group/AbstractGroupOperator.java       |  1 -
 .../manager/service/group/InlongGroupService.java  |  3 +-
 .../inlong/manager/service/ServiceBaseTest.java    |  2 -
 .../manager/service/group/GroupServiceTest.java    |  7 +-
 .../manager/service/sort/SortServiceImplTest.java  | 16 ++---
 .../main/resources/h2/apache_inlong_manager.sql    |  1 +
 .../manager-web/sql/apache_inlong_manager.sql      |  1 +
 15 files changed, 173 insertions(+), 49 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index dae48131d..e86c1142e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -18,6 +18,10 @@
 package org.apache.inlong.common.pojo.agent;
 
 import lombok.Data;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import java.util.List;
 
 /**
  * The task config for agent.
@@ -45,6 +49,29 @@ public class DataConfig {
      * The task delivery time, format is 'yyyy-MM-dd HH:mm:ss'.
      */
     private String deliveryTime;
+    /**
+     * Data report type.
+     * The current constraint is that all InLong Agents under one InlongGroup use the same type.
+     * <p/>
+     * This constraint is not applicable to InlongStream or StreamSource, which avoids the configuration
+     * granularity and reduces the operation and maintenance costs.
+     * <p/>
+     * Supported type:
+     * <pre>
+     *     0: report to DataProxy and respond when the DataProxy received data.
+     *     1: report to DataProxy and respond after DataProxy sends data.
+     *     2: report to MQ and respond when the MQ received data.
+     * </pre>
+     */
+    private Integer dataReportType = 0;
+    /**
+     * MQ cluster information, valid when reportDataTo is 2.
+     */
+    private List<MQClusterInfo> mqClusters;
+    /**
+     * MQ's topic information, valid when reportDataTo is 2.
+     */
+    private DataProxyTopicInfo topicInfo;
 
     public boolean isValid() {
         return true;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index de96b3172..d21e14df6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -56,6 +56,18 @@ public class InlongConstants {
     public static final Integer DISABLE_CREATE_RESOURCE = 0;
     public static final Integer ENABLE_CREATE_RESOURCE = 1;
 
+    /**
+     * Data report type, support:
+     * <pre>
+     *     0: report to DataProxy and respond when the DataProxy received data.
+     *     1: report to DataProxy and respond after DataProxy sends data.
+     *     2: report to MQ and respond when the MQ received data.
+     * </pre>
+     */
+    public static final Integer REPORT_TO_DP_RECEIVED = 0;
+    public static final Integer REPORT_TO_DP_SENT = 1;
+    public static final Integer REPORT_TO_MQ_RECEIVED = 2;
+
     public static final Integer UN_SYNC_SEND = 0;
     public static final Integer SYNC_SEND = 1;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java
index d6e8a7051..ca7007a8f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java
@@ -43,6 +43,7 @@ public class InlongGroupEntity implements Serializable {
     private Integer enableZookeeper;
     private Integer enableCreateResource;
     private Integer lightweight;
+    private Integer dataReportType;
     private String inlongClusterTag;
 
     private String extParams;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index 47a97759f..129c154ec 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -34,6 +34,7 @@
         <result column="enable_zookeeper" jdbcType="INTEGER" property="enableZookeeper"/>
         <result column="enable_create_resource" jdbcType="INTEGER" property="enableCreateResource"/>
         <result column="lightweight" jdbcType="INTEGER" property="lightweight"/>
+        <result column="data_report_type" jdbcType="INTEGER" property="dataReportType"/>
         <result column="inlong_cluster_tag" jdbcType="VARCHAR" property="inlongClusterTag"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
 
@@ -55,10 +56,10 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id, inlong_group_id, name, description, mq_type, mq_resource,
-        daily_records, daily_storage, peak_records, max_length,
-        enable_zookeeper, enable_create_resource, lightweight, inlong_cluster_tag, ext_params,
-        in_charges, followers, status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version
+        id, inlong_group_id, name, description, mq_type, mq_resource, daily_records, daily_storage,
+        peak_records, max_length, enable_zookeeper, enable_create_resource, lightweight, data_report_type,
+        inlong_cluster_tag, ext_params, in_charges, followers, status, previous_status, is_deleted,
+        creator, modifier, create_time, modify_time, version
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -69,22 +70,22 @@
                                   daily_records, daily_storage,
                                   peak_records, max_length,
                                   enable_zookeeper, enable_create_resource,
-                                  lightweight, inlong_cluster_tag,
-                                  ext_params, in_charges,
-                                  followers, status,
-                                  previous_status, creator,
-                                  modifier)
+                                  lightweight, data_report_type,
+                                  inlong_cluster_tag, ext_params,
+                                  in_charges, followers,
+                                  status, previous_status,
+                                  creator, modifier)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
                 #{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR},
                 #{mqType,jdbcType=VARCHAR}, #{mqResource,jdbcType=VARCHAR},
                 #{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER},
                 #{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
                 #{enableZookeeper,jdbcType=INTEGER}, #{enableCreateResource,jdbcType=INTEGER},
-                #{lightweight,jdbcType=INTEGER}, #{inlongClusterTag,jdbcType=VARCHAR},
-                #{extParams,jdbcType=LONGVARCHAR}, #{inCharges,jdbcType=VARCHAR},
-                #{followers,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
-                #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
-                #{modifier,jdbcType=VARCHAR})
+                #{lightweight,jdbcType=INTEGER}, #{dataReportType,jdbcType=INTEGER},
+                #{inlongClusterTag,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{inCharges,jdbcType=VARCHAR}, #{followers,jdbcType=VARCHAR},
+                #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
+                #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
     </insert>
 
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -232,6 +233,7 @@
             enable_zookeeper       = #{enableZookeeper,jdbcType=INTEGER},
             enable_create_resource = #{enableCreateResource,jdbcType=INTEGER},
             lightweight            = #{lightweight,jdbcType=INTEGER},
+            data_report_type       = #{dataReportType,jdbcType=INTEGER},
             inlong_cluster_tag     = #{inlongClusterTag,jdbcType=VARCHAR},
 
             ext_params             = #{extParams,jdbcType=LONGVARCHAR},
@@ -281,6 +283,9 @@
             <if test="lightweight != null">
                 lightweight = #{lightweight,jdbcType=INTEGER},
             </if>
+            <if test="dataReportType != null">
+                data_report_type = #{dataReportType,jdbcType=INTEGER},
+            </if>
             <if test="inlongClusterTag != null">
                 inlong_cluster_tag = #{inlongClusterTag,jdbcType=VARCHAR},
             </if>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
index 25e436b32..43dcc4bf9 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
@@ -77,4 +77,11 @@ public class InlongGroupApproveRequest {
     @ApiModelProperty(value = "The unit of message size")
     private String retentionSizeUnit;
 
+    @ApiModelProperty(value = "Data report type, default is 0.\n"
+            + " 0: report to DataProxy and respond when the DataProxy received data.\n"
+            + " 1: report to DataProxy and respond after DataProxy sends data.\n"
+            + " 2: report to MQ and respond when the MQ received data.",
+            notes = "Current constraint is that all InLong Agents under one InlongGroup use the same type")
+    private Integer dataReportType = 0;
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index bedce350f..b255a9fd7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -70,11 +70,18 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
     private Integer enableZookeeper = 0;
 
     @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
-    private Integer enableCreateResource;
+    private Integer enableCreateResource = 0;
 
     @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
     private Integer lightweight;
 
+    @ApiModelProperty(value = "Data report type, default is 0.\n"
+            + " 0: report to DataProxy and respond when the DataProxy received data.\n"
+            + " 1: report to DataProxy and respond after DataProxy sends data.\n"
+            + " 2: report to MQ and respond when the MQ received data.",
+            notes = "Current constraint is that all InLong Agents under one InlongGroup use the same type")
+    private Integer dataReportType = 0;
+
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
     private String inlongClusterTag;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index f5e869162..d4df45763 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -25,8 +25,10 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.hibernate.validator.constraints.Length;
+import org.hibernate.validator.constraints.Range;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
 import javax.validation.constraints.Pattern;
 import java.util.List;
 
@@ -41,11 +43,11 @@ import java.util.List;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
 public abstract class InlongGroupRequest extends BaseInlongGroup {
 
-    @NotBlank(message = "inlongGroupId cannot be blank")
+    @NotBlank(message = "cannot be blank")
     @ApiModelProperty(value = "Inlong group id", required = true)
-    @Length(min = 4, max = 100, message = "inlongGroupId length must be between 4 and 100")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$",
-            message = "inlongGroupId only supports lowercase letters, numbers, '-', or '_'")
+            message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @ApiModelProperty(value = "Inlong group name", required = true)
@@ -58,7 +60,7 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
     @ApiModelProperty(value = "MQ type, replaced by mqType")
     private String middlewareType;
 
-    @NotBlank(message = "mqType cannot be blank")
+    @NotBlank(message = "cannot be blank")
     @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
     private String mqType;
 
@@ -78,6 +80,15 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
     @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
     private Integer lightweight = 0;
 
+    @NotNull(message = "cannot be null")
+    @Range(min = 0, max = 2, message = "only supports 0, 1, 2")
+    @ApiModelProperty(value = "Data report type, default is 0.\n"
+            + " 0: report to DataProxy and respond when the DataProxy received data.\n"
+            + " 1: report to DataProxy and respond after DataProxy sends data.\n"
+            + " 2: report to MQ and respond when the MQ received data.",
+            notes = "Current constraint is that all InLong Agents under one InlongGroup use the same type")
+    private Integer dataReportType = 0;
+
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
     private String inlongClusterTag;
 
@@ -93,7 +104,7 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
     @ApiModelProperty(value = "The maximum length of a single piece of data, unit: Byte")
     private Integer maxLength;
 
-    @NotBlank(message = "inCharges cannot be blank")
+    @NotBlank(message = "cannot be blank")
     @ApiModelProperty(value = "Name of responsible person, separated by commas")
     private String inCharges;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 29f0dcbff..efe3d32a7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -30,17 +30,26 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
@@ -52,8 +61,10 @@ import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
@@ -90,7 +101,11 @@ public class AgentServiceImpl implements AgentService {
     @Autowired
     private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
     @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
     private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private InlongClusterEntityMapper clusterMapper;
 
     @Override
     public Boolean reportSnapshot(TaskSnapshotRequest request) {
@@ -337,26 +352,68 @@ public class AgentServiceImpl implements AgentService {
         String streamId = entity.getInlongStreamId();
         dataConfig.setInlongGroupId(groupId);
         dataConfig.setInlongStreamId(streamId);
+
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if (groupEntity == null) {
+            throw new BusinessException(String.format("inlong group not found for groupId=%s", groupId));
+        }
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        if (streamEntity == null) {
+            throw new BusinessException(
+                    String.format("inlong stream not found for groupId=%s streamId=%s", groupId, streamId));
+        }
+
         String extParams = entity.getExtParams();
-        if (streamEntity != null) {
-            dataConfig.setSyncSend(streamEntity.getSyncSend());
-            if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
-                String dataSeparator = streamEntity.getDataSeparator();
-                extParams = null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams;
-            }
-        } else {
-            dataConfig.setSyncSend(0);
-            LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);
+        dataConfig.setSyncSend(streamEntity.getSyncSend());
+        if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
+            String dataSeparator = streamEntity.getDataSeparator();
+            extParams = (null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams);
         }
         dataConfig.setExtParams(extParams);
+
+        int dataReportType = groupEntity.getDataReportType();
+        dataConfig.setDataReportType(dataReportType);
+        if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
+            // add mq cluster setting
+            List<MQClusterInfo> mqSet = new ArrayList<>();
+            List<InlongClusterEntity> mqClusterList =
+                    clusterMapper.selectByClusterTag(groupEntity.getInlongClusterTag());
+            for (InlongClusterEntity cluster : mqClusterList) {
+                MQClusterInfo clusterInfo = new MQClusterInfo();
+                clusterInfo.setUrl(cluster.getUrl());
+                clusterInfo.setToken(cluster.getToken());
+                clusterInfo.setMqType(cluster.getType());
+                clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), HashMap.class));
+                mqSet.add(clusterInfo);
+            }
+            dataConfig.setMqClusters(mqSet);
+            // add topic setting
+            InlongClusterEntity cluster = mqClusterList.get(0);
+            String mqResource = groupEntity.getMqResource();
+            String mqType = groupEntity.getMqType();
+            if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
+                PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
+                String tenant = pulsarCluster.getTenant();
+                if (StringUtils.isBlank(tenant)) {
+                    tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+                }
+                String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+                        tenant, mqResource, streamEntity.getMqResource());
+                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                topicConfig.setInlongGroupId(groupId + "/" + streamId);
+                topicConfig.setTopic(topic);
+                dataConfig.setTopicInfo(topicConfig);
+            } else if (MQType.TUBEMQ.equals(mqType)) {
+                DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                topicConfig.setInlongGroupId(groupId);
+                topicConfig.setTopic(mqResource);
+                dataConfig.setTopicInfo(topicConfig);
+            }
+        }
         return dataConfig;
     }
 
     private String getExtParams(String extParams, String dataSeparator) {
-        if (Objects.isNull(extParams)) {
-            return null;
-        }
         FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class);
         if (Objects.nonNull(fileSourceDTO)) {
             fileSourceDTO.setDataSeparator(dataSeparator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index eed0cba50..562bd34da 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -91,7 +91,6 @@ public abstract class AbstractGroupOperator implements InlongGroupOperator {
     @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public void updateOpt(InlongGroupRequest request, String operator) {
         InlongGroupEntity entity = CommonBeanUtils.copyProperties(request, InlongGroupEntity::new);
-
         // set the ext params
         setTargetEntity(request, entity);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 58acbfb27..258900f06 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -43,7 +43,8 @@ public interface InlongGroupService {
      * @param operator name of operator
      * @return inlong group id after saving
      */
-    String save(InlongGroupRequest groupInfo, String operator);
+    String save(@Valid @NotNull(message = "inlong group request cannot be null") InlongGroupRequest groupInfo,
+            String operator);
 
     /**
      * Query whether the specified group id exists
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index da8665fb1..a5ba89b9b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.service;
 
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
@@ -89,7 +88,6 @@ public class ServiceBaseTest extends BaseTest {
         groupInfo.setMqType(mqType);
         groupInfo.setMqResource("test-queue");
         groupInfo.setInCharges(GLOBAL_OPERATOR);
-        groupInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         groupService.save(groupInfo.genRequest(), GLOBAL_OPERATOR);
         InlongGroupInfo updateGroupInfo = groupService.get(inlongGroupId);
         groupService.updateStatus(inlongGroupId, GroupStatus.TO_BE_APPROVAL.getCode(), GLOBAL_OPERATOR);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java
index 0f277ec49..f698765ad 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java
@@ -40,8 +40,7 @@ class GroupServiceTest extends ServiceBaseTest {
                 ConstraintViolationException.class,
                 () -> groupService.update(new InlongTubeMQRequest(), ""));
 
-        Assertions.assertTrue(exception.getMessage().contains("inCharges cannot be blank"));
-        Assertions.assertTrue(exception.getMessage().contains("inlongGroupId cannot be blank"));
+        Assertions.assertTrue(exception.getMessage().contains("cannot be blank"));
     }
 
     @Test
@@ -49,8 +48,6 @@ class GroupServiceTest extends ServiceBaseTest {
         ConstraintViolationException exception = Assertions.assertThrows(ConstraintViolationException.class,
                 () -> groupService.updateAfterApprove(new InlongGroupApproveRequest(), ""));
 
-        Assertions.assertTrue(exception.getMessage().contains("mqType cannot be blank"));
-        Assertions.assertTrue(exception.getMessage().contains("inlongGroupId cannot be blank"));
-
+        Assertions.assertTrue(exception.getMessage().contains("cannot be blank"));
     }
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index d52b47486..9d4d417af 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -22,8 +22,8 @@ import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
@@ -32,18 +32,18 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.SortService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.node.DataNodeService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.json.JSONObject;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -56,8 +56,8 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.List;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -66,9 +66,9 @@ import java.util.Map;
 @TestMethodOrder(OrderAnnotation.class)
 public class SortServiceImplTest extends ServiceBaseTest {
 
-    private static final String TEST_CLUSTER = "testCluster";
-    private static final String TEST_TASK = "testTask";
-    private static final String TEST_GROUP = "testGroup";
+    private static final String TEST_GROUP = "test-group";
+    private static final String TEST_CLUSTER = "test-cluster";
+    private static final String TEST_TASK = "test-task";
     private static final String TEST_STREAM_1 = "1";
     private static final String TEST_STREAM_2 = "2";
     private static final String TEST_TAG = "testTag";
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index d5da92d43..d3210bf43 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `enable_zookeeper`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
     `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
+    `data_report_type`       int(4)                DEFAULT '0' COMMENT 'Data report type. 0: report to DataProxy and respond when the DataProxy received data. 1: report to DataProxy and respond after DataProxy sends data. 2: report to MQ and respond when the MQ received data',
     `inlong_cluster_tag`     varchar(128)          DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
     `ext_params`             mediumtext            DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 2281110ba..509037e1c 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `enable_zookeeper`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
     `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
+    `data_report_type`       int(4)                DEFAULT '0' COMMENT 'Data report type. 0: report to DataProxy and respond when the DataProxy received data. 1: report to DataProxy and respond after DataProxy sends data. 2: report to MQ and respond when the MQ received data',
     `inlong_cluster_tag`     varchar(128)          DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
     `ext_params`             mediumtext            DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',