You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/11/24 11:52:27 UTC
[inlong] branch master updated: [INLONG-6610][Manager] Support the saving and query of data report-to settings (#6614)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new be448d672 [INLONG-6610][Manager] Support the saving and query of data report-to settings (#6614)
be448d672 is described below
commit be448d672e42b671bb29518949e5fccd0457a06f
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Nov 24 19:52:21 2022 +0800
[INLONG-6610][Manager] Support the saving and query of data report-to settings (#6614)
Co-authored-by: healchow <he...@gmail.com>
---
.../inlong/common/pojo/agent/DataConfig.java | 27 ++++++++
.../manager/common/consts/InlongConstants.java | 12 ++++
.../manager/dao/entity/InlongGroupEntity.java | 1 +
.../resources/mappers/InlongGroupEntityMapper.xml | 33 +++++----
.../pojo/group/InlongGroupApproveRequest.java | 7 ++
.../inlong/manager/pojo/group/InlongGroupInfo.java | 9 ++-
.../manager/pojo/group/InlongGroupRequest.java | 21 ++++--
.../service/core/impl/AgentServiceImpl.java | 81 ++++++++++++++++++----
.../service/group/AbstractGroupOperator.java | 1 -
.../manager/service/group/InlongGroupService.java | 3 +-
.../inlong/manager/service/ServiceBaseTest.java | 2 -
.../manager/service/group/GroupServiceTest.java | 7 +-
.../manager/service/sort/SortServiceImplTest.java | 16 ++---
.../main/resources/h2/apache_inlong_manager.sql | 1 +
.../manager-web/sql/apache_inlong_manager.sql | 1 +
15 files changed, 173 insertions(+), 49 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index dae48131d..e86c1142e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -18,6 +18,10 @@
package org.apache.inlong.common.pojo.agent;
import lombok.Data;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import java.util.List;
/**
* The task config for agent.
@@ -45,6 +49,29 @@ public class DataConfig {
* The task delivery time, format is 'yyyy-MM-dd HH:mm:ss'.
*/
private String deliveryTime;
+ /**
+ * Data report type.
+ * The current constraint is that all InLong Agents under one InlongGroup use the same type.
+ * <p/>
+ * This constraint is not applicable to InlongStream or StreamSource, which avoids the configuration
+ * granularity and reduces the operation and maintenance costs.
+ * <p/>
+ * Supported type:
+ * <pre>
+ * 0: report to DataProxy and respond when the DataProxy received data.
+ * 1: report to DataProxy and respond after DataProxy sends data.
+ * 2: report to MQ and respond when the MQ received data.
+ * </pre>
+ */
+ private Integer dataReportType = 0;
+ /**
+ * MQ cluster information, valid when reportDataTo is 2.
+ */
+ private List<MQClusterInfo> mqClusters;
+ /**
+ * MQ's topic information, valid when reportDataTo is 2.
+ */
+ private DataProxyTopicInfo topicInfo;
public boolean isValid() {
return true;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index de96b3172..d21e14df6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -56,6 +56,18 @@ public class InlongConstants {
public static final Integer DISABLE_CREATE_RESOURCE = 0;
public static final Integer ENABLE_CREATE_RESOURCE = 1;
+ /**
+ * Data report type, support:
+ * <pre>
+ * 0: report to DataProxy and respond when the DataProxy received data.
+ * 1: report to DataProxy and respond after DataProxy sends data.
+ * 2: report to MQ and respond when the MQ received data.
+ * </pre>
+ */
+ public static final Integer REPORT_TO_DP_RECEIVED = 0;
+ public static final Integer REPORT_TO_DP_SENT = 1;
+ public static final Integer REPORT_TO_MQ_RECEIVED = 2;
+
public static final Integer UN_SYNC_SEND = 0;
public static final Integer SYNC_SEND = 1;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java
index d6e8a7051..ca7007a8f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongGroupEntity.java
@@ -43,6 +43,7 @@ public class InlongGroupEntity implements Serializable {
private Integer enableZookeeper;
private Integer enableCreateResource;
private Integer lightweight;
+ private Integer dataReportType;
private String inlongClusterTag;
private String extParams;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index 47a97759f..129c154ec 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -34,6 +34,7 @@
<result column="enable_zookeeper" jdbcType="INTEGER" property="enableZookeeper"/>
<result column="enable_create_resource" jdbcType="INTEGER" property="enableCreateResource"/>
<result column="lightweight" jdbcType="INTEGER" property="lightweight"/>
+ <result column="data_report_type" jdbcType="INTEGER" property="dataReportType"/>
<result column="inlong_cluster_tag" jdbcType="VARCHAR" property="inlongClusterTag"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
@@ -55,10 +56,10 @@
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, name, description, mq_type, mq_resource,
- daily_records, daily_storage, peak_records, max_length,
- enable_zookeeper, enable_create_resource, lightweight, inlong_cluster_tag, ext_params,
- in_charges, followers, status, previous_status, is_deleted, creator, modifier, create_time, modify_time, version
+ id, inlong_group_id, name, description, mq_type, mq_resource, daily_records, daily_storage,
+ peak_records, max_length, enable_zookeeper, enable_create_resource, lightweight, data_report_type,
+ inlong_cluster_tag, ext_params, in_charges, followers, status, previous_status, is_deleted,
+ creator, modifier, create_time, modify_time, version
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -69,22 +70,22 @@
daily_records, daily_storage,
peak_records, max_length,
enable_zookeeper, enable_create_resource,
- lightweight, inlong_cluster_tag,
- ext_params, in_charges,
- followers, status,
- previous_status, creator,
- modifier)
+ lightweight, data_report_type,
+ inlong_cluster_tag, ext_params,
+ in_charges, followers,
+ status, previous_status,
+ creator, modifier)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR},
#{mqType,jdbcType=VARCHAR}, #{mqResource,jdbcType=VARCHAR},
#{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER},
#{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
#{enableZookeeper,jdbcType=INTEGER}, #{enableCreateResource,jdbcType=INTEGER},
- #{lightweight,jdbcType=INTEGER}, #{inlongClusterTag,jdbcType=VARCHAR},
- #{extParams,jdbcType=LONGVARCHAR}, #{inCharges,jdbcType=VARCHAR},
- #{followers,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
- #{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
- #{modifier,jdbcType=VARCHAR})
+ #{lightweight,jdbcType=INTEGER}, #{dataReportType,jdbcType=INTEGER},
+ #{inlongClusterTag,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+ #{inCharges,jdbcType=VARCHAR}, #{followers,jdbcType=VARCHAR},
+ #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
+ #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -232,6 +233,7 @@
enable_zookeeper = #{enableZookeeper,jdbcType=INTEGER},
enable_create_resource = #{enableCreateResource,jdbcType=INTEGER},
lightweight = #{lightweight,jdbcType=INTEGER},
+ data_report_type = #{dataReportType,jdbcType=INTEGER},
inlong_cluster_tag = #{inlongClusterTag,jdbcType=VARCHAR},
ext_params = #{extParams,jdbcType=LONGVARCHAR},
@@ -281,6 +283,9 @@
<if test="lightweight != null">
lightweight = #{lightweight,jdbcType=INTEGER},
</if>
+ <if test="dataReportType != null">
+ data_report_type = #{dataReportType,jdbcType=INTEGER},
+ </if>
<if test="inlongClusterTag != null">
inlong_cluster_tag = #{inlongClusterTag,jdbcType=VARCHAR},
</if>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
index 25e436b32..43dcc4bf9 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupApproveRequest.java
@@ -77,4 +77,11 @@ public class InlongGroupApproveRequest {
@ApiModelProperty(value = "The unit of message size")
private String retentionSizeUnit;
+ @ApiModelProperty(value = "Data report type, default is 0.\n"
+ + " 0: report to DataProxy and respond when the DataProxy received data.\n"
+ + " 1: report to DataProxy and respond after DataProxy sends data.\n"
+ + " 2: report to MQ and respond when the MQ received data.",
+ notes = "Current constraint is that all InLong Agents under one InlongGroup use the same type")
+ private Integer dataReportType = 0;
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index bedce350f..b255a9fd7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -70,11 +70,18 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
private Integer enableZookeeper = 0;
@ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
- private Integer enableCreateResource;
+ private Integer enableCreateResource = 0;
@ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
private Integer lightweight;
+ @ApiModelProperty(value = "Data report type, default is 0.\n"
+ + " 0: report to DataProxy and respond when the DataProxy received data.\n"
+ + " 1: report to DataProxy and respond after DataProxy sends data.\n"
+ + " 2: report to MQ and respond when the MQ received data.",
+ notes = "Current constraint is that all InLong Agents under one InlongGroup use the same type")
+ private Integer dataReportType = 0;
+
@ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
private String inlongClusterTag;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index f5e869162..d4df45763 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -25,8 +25,10 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.hibernate.validator.constraints.Length;
+import org.hibernate.validator.constraints.Range;
import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import java.util.List;
@@ -41,11 +43,11 @@ import java.util.List;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
public abstract class InlongGroupRequest extends BaseInlongGroup {
- @NotBlank(message = "inlongGroupId cannot be blank")
+ @NotBlank(message = "cannot be blank")
@ApiModelProperty(value = "Inlong group id", required = true)
- @Length(min = 4, max = 100, message = "inlongGroupId length must be between 4 and 100")
+ @Length(min = 4, max = 100, message = "length must be between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$",
- message = "inlongGroupId only supports lowercase letters, numbers, '-', or '_'")
+ message = "only supports lowercase letters, numbers, '-', or '_'")
private String inlongGroupId;
@ApiModelProperty(value = "Inlong group name", required = true)
@@ -58,7 +60,7 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@ApiModelProperty(value = "MQ type, replaced by mqType")
private String middlewareType;
- @NotBlank(message = "mqType cannot be blank")
+ @NotBlank(message = "cannot be blank")
@ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
private String mqType;
@@ -78,6 +80,15 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
private Integer lightweight = 0;
+ @NotNull(message = "cannot be null")
+ @Range(min = 0, max = 2, message = "only supports 0, 1, 2")
+ @ApiModelProperty(value = "Data report type, default is 0.\n"
+ + " 0: report to DataProxy and respond when the DataProxy received data.\n"
+ + " 1: report to DataProxy and respond after DataProxy sends data.\n"
+ + " 2: report to MQ and respond when the MQ received data.",
+ notes = "Current constraint is that all InLong Agents under one InlongGroup use the same type")
+ private Integer dataReportType = 0;
+
@ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
private String inlongClusterTag;
@@ -93,7 +104,7 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@ApiModelProperty(value = "The maximum length of a single piece of data, unit: Byte")
private Integer maxLength;
- @NotBlank(message = "inCharges cannot be blank")
+ @NotBlank(message = "cannot be blank")
@ApiModelProperty(value = "Name of responsible person, separated by commas")
private String inCharges;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 29f0dcbff..efe3d32a7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -30,17 +30,26 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
@@ -52,8 +61,10 @@ import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
@@ -90,7 +101,11 @@ public class AgentServiceImpl implements AgentService {
@Autowired
private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
@Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private InlongClusterEntityMapper clusterMapper;
@Override
public Boolean reportSnapshot(TaskSnapshotRequest request) {
@@ -337,26 +352,68 @@ public class AgentServiceImpl implements AgentService {
String streamId = entity.getInlongStreamId();
dataConfig.setInlongGroupId(groupId);
dataConfig.setInlongStreamId(streamId);
+
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ if (groupEntity == null) {
+ throw new BusinessException(String.format("inlong group not found for groupId=%s", groupId));
+ }
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ if (streamEntity == null) {
+ throw new BusinessException(
+ String.format("inlong stream not found for groupId=%s streamId=%s", groupId, streamId));
+ }
+
String extParams = entity.getExtParams();
- if (streamEntity != null) {
- dataConfig.setSyncSend(streamEntity.getSyncSend());
- if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
- String dataSeparator = streamEntity.getDataSeparator();
- extParams = null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams;
- }
- } else {
- dataConfig.setSyncSend(0);
- LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);
+ dataConfig.setSyncSend(streamEntity.getSyncSend());
+ if (SourceType.FILE.equalsIgnoreCase(streamEntity.getDataType())) {
+ String dataSeparator = streamEntity.getDataSeparator();
+ extParams = (null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams);
}
dataConfig.setExtParams(extParams);
+
+ int dataReportType = groupEntity.getDataReportType();
+ dataConfig.setDataReportType(dataReportType);
+ if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
+ // add mq cluster setting
+ List<MQClusterInfo> mqSet = new ArrayList<>();
+ List<InlongClusterEntity> mqClusterList =
+ clusterMapper.selectByClusterTag(groupEntity.getInlongClusterTag());
+ for (InlongClusterEntity cluster : mqClusterList) {
+ MQClusterInfo clusterInfo = new MQClusterInfo();
+ clusterInfo.setUrl(cluster.getUrl());
+ clusterInfo.setToken(cluster.getToken());
+ clusterInfo.setMqType(cluster.getType());
+ clusterInfo.setParams(JsonUtils.parseObject(cluster.getExtParams(), HashMap.class));
+ mqSet.add(clusterInfo);
+ }
+ dataConfig.setMqClusters(mqSet);
+ // add topic setting
+ InlongClusterEntity cluster = mqClusterList.get(0);
+ String mqResource = groupEntity.getMqResource();
+ String mqType = groupEntity.getMqType();
+ if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
+ PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ }
+ String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+ tenant, mqResource, streamEntity.getMqResource());
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId + "/" + streamId);
+ topicConfig.setTopic(topic);
+ dataConfig.setTopicInfo(topicConfig);
+ } else if (MQType.TUBEMQ.equals(mqType)) {
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId);
+ topicConfig.setTopic(mqResource);
+ dataConfig.setTopicInfo(topicConfig);
+ }
+ }
return dataConfig;
}
private String getExtParams(String extParams, String dataSeparator) {
- if (Objects.isNull(extParams)) {
- return null;
- }
FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class);
if (Objects.nonNull(fileSourceDTO)) {
fileSourceDTO.setDataSeparator(dataSeparator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index eed0cba50..562bd34da 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -91,7 +91,6 @@ public abstract class AbstractGroupOperator implements InlongGroupOperator {
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void updateOpt(InlongGroupRequest request, String operator) {
InlongGroupEntity entity = CommonBeanUtils.copyProperties(request, InlongGroupEntity::new);
-
// set the ext params
setTargetEntity(request, entity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 58acbfb27..258900f06 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -43,7 +43,8 @@ public interface InlongGroupService {
* @param operator name of operator
* @return inlong group id after saving
*/
- String save(InlongGroupRequest groupInfo, String operator);
+ String save(@Valid @NotNull(message = "inlong group request cannot be null") InlongGroupRequest groupInfo,
+ String operator);
/**
* Query whether the specified group id exists
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index da8665fb1..a5ba89b9b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.service;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.GroupStatus;
@@ -89,7 +88,6 @@ public class ServiceBaseTest extends BaseTest {
groupInfo.setMqType(mqType);
groupInfo.setMqResource("test-queue");
groupInfo.setInCharges(GLOBAL_OPERATOR);
- groupInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
groupService.save(groupInfo.genRequest(), GLOBAL_OPERATOR);
InlongGroupInfo updateGroupInfo = groupService.get(inlongGroupId);
groupService.updateStatus(inlongGroupId, GroupStatus.TO_BE_APPROVAL.getCode(), GLOBAL_OPERATOR);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java
index 0f277ec49..f698765ad 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/GroupServiceTest.java
@@ -40,8 +40,7 @@ class GroupServiceTest extends ServiceBaseTest {
ConstraintViolationException.class,
() -> groupService.update(new InlongTubeMQRequest(), ""));
- Assertions.assertTrue(exception.getMessage().contains("inCharges cannot be blank"));
- Assertions.assertTrue(exception.getMessage().contains("inlongGroupId cannot be blank"));
+ Assertions.assertTrue(exception.getMessage().contains("cannot be blank"));
}
@Test
@@ -49,8 +48,6 @@ class GroupServiceTest extends ServiceBaseTest {
ConstraintViolationException exception = Assertions.assertThrows(ConstraintViolationException.class,
() -> groupService.updateAfterApprove(new InlongGroupApproveRequest(), ""));
- Assertions.assertTrue(exception.getMessage().contains("mqType cannot be blank"));
- Assertions.assertTrue(exception.getMessage().contains("inlongGroupId cannot be blank"));
-
+ Assertions.assertTrue(exception.getMessage().contains("cannot be blank"));
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index d52b47486..9d4d417af 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -22,8 +22,8 @@ import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
@@ -32,18 +32,18 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.SortService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.node.DataNodeService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -56,8 +56,8 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Date;
-import java.util.List;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -66,9 +66,9 @@ import java.util.Map;
@TestMethodOrder(OrderAnnotation.class)
public class SortServiceImplTest extends ServiceBaseTest {
- private static final String TEST_CLUSTER = "testCluster";
- private static final String TEST_TASK = "testTask";
- private static final String TEST_GROUP = "testGroup";
+ private static final String TEST_GROUP = "test-group";
+ private static final String TEST_CLUSTER = "test-cluster";
+ private static final String TEST_TASK = "test-task";
private static final String TEST_STREAM_1 = "1";
private static final String TEST_STREAM_2 = "2";
private static final String TEST_TAG = "testTag";
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index d5da92d43..d3210bf43 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`enable_zookeeper` tinyint(1) DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
`lightweight` tinyint(1) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
+ `data_report_type` int(4) DEFAULT '0' COMMENT 'Data report type. 0: report to DataProxy and respond when the DataProxy received data. 1: report to DataProxy and respond after DataProxy sends data. 2: report to MQ and respond when the MQ received data',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 2281110ba..509037e1c 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -42,6 +42,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`enable_zookeeper` tinyint(1) DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
`lightweight` tinyint(1) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
+ `data_report_type` int(4) DEFAULT '0' COMMENT 'Data report type. 0: report to DataProxy and respond when the DataProxy received data. 1: report to DataProxy and respond after DataProxy sends data. 2: report to MQ and respond when the MQ received data',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',