You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/03 10:07:34 UTC
[inlong] branch master updated: [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 473f67b24 [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542)
473f67b24 is described below
commit 473f67b248b9c8c309878e4e5200fa5c1a645ccc
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Sat Sep 3 18:07:28 2022 +0800
[INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542)
---
.../{ConsumptionStatus.java => ConsumeStatus.java} | 49 +++--
.../inlong/manager/common/enums/ErrorCodeEnum.java | 8 +
.../manager/dao/entity/InlongConsumeEntity.java} | 34 +++-
.../dao/mapper/InlongConsumeEntityMapper.java} | 35 ++--
.../mappers/InlongConsumeEntityMapper.xml | 190 +++++++++++++++++++
.../inlong/manager/pojo/common/PageRequest.java | 2 +
.../pojo/consume/InlongConsumeBriefInfo.java | 73 ++++++++
.../InlongConsumeCountInfo.java} | 23 ++-
.../manager/pojo/consume/InlongConsumeInfo.java | 89 +++++++++
.../pojo/consume/InlongConsumePageRequest.java | 69 +++++++
.../manager/pojo/consume/InlongConsumeRequest.java | 72 +++++++
.../pojo/consume/pulsar/ConsumePulsarDTO.java | 81 ++++++++
.../pojo/consume/pulsar/ConsumePulsarInfo.java | 57 ++++++
.../pojo/consume/pulsar/ConsumePulsarRequest.java | 51 +++++
.../pojo/consume/tubemq/ConsumeTubeMQDTO.java | 63 +++++++
.../tubemq/ConsumeTubeMQInfo.java} | 26 ++-
.../tubemq/ConsumeTubeMQRequest.java} | 20 +-
.../manager/pojo/group/InlongGroupRequest.java | 3 -
.../manager/pojo/group/tubemq/InlongTubeMQDTO.java | 2 +-
.../service/consume/AbstractConsumeOperator.java | 86 +++++++++
.../service/consume/ConsumePulsarOperator.java | 152 +++++++++++++++
.../service/consume/ConsumeTubeMQOperator.java | 91 +++++++++
.../service/consume/InlongConsumeOperator.java | 73 ++++++++
.../consume/InlongConsumeOperatorFactory.java | 46 +++++
.../InlongConsumeProcessService.java} | 25 ++-
.../service/consume/InlongConsumeService.java | 96 ++++++++++
.../service/consume/InlongConsumeServiceImpl.java | 208 +++++++++++++++++++++
.../service/core/impl/ConsumptionServiceImpl.java | 20 +-
.../service/group/InlongGroupProcessService.java | 2 +-
.../service/group/InlongGroupServiceImpl.java | 7 +-
.../ConsumptionCancelProcessListener.java | 4 +-
.../ConsumptionCompleteProcessListener.java | 4 +-
.../ConsumptionRejectProcessListener.java | 4 +-
.../main/resources/h2/apache_inlong_manager.sql | 31 ++-
.../manager-web/sql/apache_inlong_manager.sql | 31 ++-
.../web/controller/ConsumptionController.java | 4 +-
.../web/controller/InlongConsumeController.java | 116 ++++++++++++
37 files changed, 1849 insertions(+), 98 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
similarity index 62%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
index 05e45fe3b..0b9013df2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
@@ -21,17 +21,18 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import org.apache.inlong.manager.common.util.InlongCollectionUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
-import org.apache.inlong.manager.common.util.InlongCollectionUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
/**
- * Data consumption status
+ * Inlong consume status
*/
-@ApiModel("Data consumption status")
-public enum ConsumptionStatus {
+@ApiModel("Inlong consume status")
+public enum ConsumeStatus {
@ApiModelProperty(value = "To be allocated: 10")
WAIT_ASSIGN(10),
@@ -46,33 +47,41 @@ public enum ConsumptionStatus {
APPROVED(21),
@ApiModelProperty(value = "Cancel application: 22")
- CANCELED(22);
+ CANCELED(22),
+
+ @ApiModelProperty(value = "Deleting: 41")
+ DELETING(41),
+
+ @ApiModelProperty(value = "Deleted: 40")
+ DELETED(40),
+
+ ;
- public static final Set<ConsumptionStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet
+ public static final Set<ConsumeStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet
.of(WAIT_ASSIGN, REJECTED, CANCELED);
- public static final Set<ConsumptionStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN);
+ public static final Set<ConsumeStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN);
- private static final Map<Integer, ConsumptionStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap(
- Lists.newArrayList(ConsumptionStatus.values()),
- ConsumptionStatus::getStatus,
+ private static final Map<Integer, ConsumeStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap(
+ Lists.newArrayList(ConsumeStatus.values()),
+ ConsumeStatus::getCode,
Function.identity()
);
- private final int status;
+ private final int code;
- ConsumptionStatus(int status) {
- this.status = status;
+ ConsumeStatus(int code) {
+ this.code = code;
}
- public static ConsumptionStatus fromStatus(int status) {
- ConsumptionStatus consumptionStatus = STATUS_MAP.get(status);
- Preconditions.checkNotNull(consumptionStatus, "status is unavailable :" + status);
- return consumptionStatus;
+ public static ConsumeStatus fromStatus(int status) {
+ ConsumeStatus consumeStatus = STATUS_MAP.get(status);
+ Preconditions.checkNotNull(consumeStatus, "consume status is invalid for " + status);
+ return consumeStatus;
}
- public int getStatus() {
- return status;
+ public int getCode() {
+ return code;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index d6cc20776..87a6e706c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -125,6 +125,14 @@ public enum ErrorCodeEnum {
PULSAR_DLQ_RLQ_ERROR(2606, "Wrong config for the RLQ and DLQ: RLQ was enabled, but the DLQ was disabled"),
PULSAR_DLQ_DUPLICATED(2607, "DLQ topic already exists under the inlong group"),
PULSAR_RLQ_DUPLICATED(2608, "RLQ topic already exists under the inlong group"),
+ CONSUMER_INFO_INCORRECT(2609, "Consumer info was incorrect"),
+ CONSUMER_NOR_FOUND(2609, "Consumer not found"),
+
+ CONSUME_NOT_FOUND(3001, "Inlong consume does not exist/no operation authority"),
+ CONSUME_DUPLICATE(3002, "Inlong consume already exists"),
+ CONSUME_INFO_INCORRECT(3003, "Inlong consume info was incorrect"),
+ CONSUME_SAVE_FAILED(3004, "Failed to save/update inlong consume"),
+ CONSUME_PERMISSION_DENIED(3005, "No permission to access this inlong consume"),
;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java
similarity index 52%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java
index 4b7bdff63..55b1fc645 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java
@@ -15,20 +15,38 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.dao.entity;
-import io.swagger.annotations.ApiModel;
import lombok.Data;
-import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.Date;
/**
- * Inlong group info for TubeMQ
+ * Inlong consume entity.
*/
@Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+public class InlongConsumeEntity implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private Integer id;
+ private String consumerGroup;
+ private String description;
+ private String mqType;
+ private String topic;
+
+ private String inlongGroupId;
+ private Integer filterEnabled;
+ private String inlongStreamId;
+ private String extParams;
- // no field
+ private String inCharges;
+ private Integer status;
+ private Integer isDeleted;
+ private String creator;
+ private String modifier;
+ private Date createTime;
+ private Date modifyTime;
+ private Integer version;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
similarity index 50%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 4b7bdff63..3b0205048 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -15,20 +15,31 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.dao.mapper;
-import io.swagger.annotations.ApiModel;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.springframework.stereotype.Repository;
-/**
- * Inlong group info for TubeMQ
- */
-@Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+import java.util.List;
+import java.util.Map;
+
+@Repository
+public interface InlongConsumeEntityMapper {
+
+ int insert(InlongConsumeEntity record);
+
+ InlongConsumeEntity selectById(Integer id);
+
+ List<Map<String, Object>> countByUser(@Param(value = "username") String username);
+
+ List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
+
+ int updateById(InlongConsumeEntity record);
+
+ int updateByIdSelective(InlongConsumeEntity record);
- // no field
+ int deleteById(Integer id);
}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
new file mode 100644
index 000000000..1db846b23
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/>
+ <result column="description" jdbcType="VARCHAR" property="description"/>
+ <result column="mq_type" jdbcType="VARCHAR" property="mqType"/>
+ <result column="topic" jdbcType="VARCHAR" property="topic"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="filter_enabled" jdbcType="INTEGER" property="filterEnabled"/>
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+ <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
+ <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
+ <result column="status" jdbcType="INTEGER" property="status"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ <result column="creator" jdbcType="VARCHAR" property="creator"/>
+ <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+ <result column="version" jdbcType="INTEGER" property="version"/>
+ </resultMap>
+
+ <sql id="Base_Column_List">
+ id, consumer_group, description, mq_type, topic, inlong_group_id, filter_enabled, inlong_stream_id,
+ ext_params, in_charges, status, is_deleted, creator, modifier, create_time, modify_time, version
+ </sql>
+
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+ insert into inlong_consume (id, consumer_group, description,
+ mq_type, topic, inlong_group_id,
+ filter_enabled, inlong_stream_id,
+ ext_params, in_charges, status,
+ is_deleted, creator, modifier)
+ values (#{id, jdbcType=INTEGER}, #{consumerGroup, jdbcType=VARCHAR}, #{description, jdbcType=VARCHAR},
+ #{mqType, jdbcType=VARCHAR}, #{topic, jdbcType=VARCHAR}, #{inlongGroupId, jdbcType=VARCHAR},
+ #{filterEnabled, jdbcType=INTEGER}, #{inlongStreamId, jdbcType=VARCHAR},
+ #{extParams, jdbcType=LONGVARCHAR}, #{inCharges, jdbcType=VARCHAR}, #{status, jdbcType=INTEGER},
+ #{isDeleted, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
+ </insert>
+
+ <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_consume
+ where id = #{id, jdbcType=INTEGER}
+ </select>
+ <select id="countByUser" resultType="java.util.Map">
+ select status, count(1) as total
+ from inlong_consume
+ where is_deleted = 0
+ and (creator = #{username, jdbcType=VARCHAR} or FIND_IN_SET(#{username, jdbcType=VARCHAR}, in_charges))
+ group by status
+ </select>
+ <select id="selectByCondition" resultMap="BaseResultMap"
+ parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_consume
+ <where>
+ is_deleted = 0
+ <if test="isAdminRole == false">
+ and (
+ creator = #{currentUser, jdbcType=VARCHAR} or FIND_IN_SET(#{currentUser, jdbcType=VARCHAR}, in_charges)
+ )
+ </if>
+ <if test="keyword != null and keyword !=''">
+ and (consumer_group like CONCAT('%', #{keyword}, '%') or topic like CONCAT('%', #{keyword}, '%'))
+ </if>
+ <if test="consumerGroup != null and consumerGroup != ''">
+ and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+ </if>
+ <if test="mqType != null and mqType != ''">
+ and mq_type = #{mqType, jdbcType=VARCHAR}
+ </if>
+ <if test="topic != null and topic != ''">
+ and topic = #{topic}
+ </if>
+ <if test="inlongGroupId != null and inlongGroupId != ''">
+ and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ <if test="status != null">
+ and status = #{status, jdbcType=INTEGER}
+ </if>
+ <if test="statusList != null and statusList.size() > 0">
+ and status in
+ <foreach collection="statusList" item="status" index="index" open="(" close=")" separator=",">
+ #{status}
+ </foreach>
+ </if>
+ </where>
+ <choose>
+ <when test="orderField != null and orderField != '' and orderType != null and orderType != ''">
+ order by ${orderField} ${orderType}
+ </when>
+ <otherwise>
+ order by create_time desc
+ </otherwise>
+ </choose>
+ </select>
+
+ <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+ update inlong_consume
+ set consumer_group = #{consumerGroup, jdbcType=VARCHAR},
+ in_charges = #{inCharges, jdbcType=VARCHAR},
+ inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+ mq_type = #{mqType, jdbcType=VARCHAR},
+ topic = #{topic, jdbcType=VARCHAR},
+ filter_enabled = #{filterEnabled, jdbcType=INTEGER},
+ inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
+ ext_params = #{extParams, jdbcType = LONGVARCHAR},
+ status = #{status, jdbcType=INTEGER},
+ modifier = #{modifier, jdbcType=VARCHAR},
+ is_deleted = #{isDeleted, jdbcType=INTEGER},
+ version = #{version, jdbcType=INTEGER} + 1
+ where id = #{id, jdbcType=INTEGER}
+ and is_deleted = 0
+ and version = #{version, jdbcType=INTEGER}
+ </update>
+
+ <update id="updateByIdSelective"
+ parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+ update inlong_consume
+ <set>
+ <if test="consumerGroup != null">
+ consumer_group = #{consumerGroup, jdbcType=VARCHAR},
+ </if>
+ <if test="description != null">
+ description = #{description,jdbcType=VARCHAR},
+ </if>
+ <if test="mqType != null">
+ mq_type = #{mqType, jdbcType=VARCHAR},
+ </if>
+ <if test="topic != null">
+ topic = #{topic, jdbcType=VARCHAR},
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+ </if>
+ <if test="filterEnabled != null">
+ filter_enabled = #{filterEnabled, jdbcType=INTEGER},
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
+ </if>
+ <if test="extParams != null">
+ ext_params = #{extParams, jdbcType=LONGVARCHAR},
+ </if>
+ <if test="inCharges != null">
+ in_charges = #{inCharges, jdbcType=VARCHAR},
+ </if>
+ <if test="status != null">
+ status = #{status, jdbcType=INTEGER},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier, jdbcType=VARCHAR},
+ </if>
+ version = #{version, jdbcType=INTEGER} + 1
+ </set>
+ where id = #{id, jdbcType=INTEGER}
+ and is_deleted = 0
+ and version = #{version, jdbcType=INTEGER}
+ </update>
+
+ <delete id="deleteById" parameterType="java.lang.Integer">
+ delete
+ from inlong_consume
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+</mapper>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java
index 8c1a105c8..fb9af69f4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java
@@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty;
@ApiModel(value = "Pagination request")
public class PageRequest {
+ public static final Integer MAX_PAGE_SIZE = 100;
+
@ApiModelProperty(value = "Current page number, default is 1")
private int pageNum = 1;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java
new file mode 100644
index 000000000..70ae5f26c
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+/**
+ * Inlong consume brief info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong consume brief info")
+public class InlongConsumeBriefInfo {
+
+ @ApiModelProperty(value = "Primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]")
+ private String consumerGroup;
+
+ @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+ private String mqType;
+
+ @ApiModelProperty(value = "The target topic of inlong consume")
+ private String topic;
+
+ @ApiModelProperty(value = "The target inlongGroupId of inlong consume")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Name of responsible person, separated by commas")
+ private String inCharges;
+
+ @ApiModelProperty(value = "Consume status")
+ private Integer status;
+
+ @ApiModelProperty(value = "Name of creator")
+ private String creator;
+
+ @ApiModelProperty(value = "Name of modifier")
+ private String modifier;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java
similarity index 58%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java
index 4b7bdff63..e48d97e0d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java
@@ -15,20 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume;
import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import lombok.NoArgsConstructor;
/**
- * Inlong group info for TubeMQ
+ * Count info of inlong consume status.
*/
@Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@ApiModel("Count info of inlong consume status")
+public class InlongConsumeCountInfo {
- // no field
+ @ApiModelProperty(value = "Total consume number")
+ private long totalCount;
+
+ @ApiModelProperty(value = "Total number of to be allocated (the number of configuring consumes)")
+ private long waitAssignCount;
+
+ @ApiModelProperty(value = "Total number of to be approved")
+ private long waitApproveCount;
+
+ @ApiModelProperty(value = "Total number of rejections")
+ private long rejectCount;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
new file mode 100644
index 000000000..a1ecf3620
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.util.Date;
+
+/**
+ * Base inlong consume info
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Base inlong consume info")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
+public abstract class InlongConsumeInfo {
+
+ @ApiModelProperty(value = "Primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]")
+ private String consumerGroup;
+
+ @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+ private String mqType;
+
+ @ApiModelProperty(value = "The target topic of this consume")
+ private String topic;
+
+ @ApiModelProperty(value = "The target inlong group id of this consume")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Whether to filter consumption, 0-not filter, 1-filter")
+ private Integer filterEnabled = 0;
+
+ @ApiModelProperty(value = "The target inlong stream id of this consume, needed if the filterEnabled=1")
+ private String inlongStreamId;
+
+ @ApiModelProperty(value = "Cluster URL of message queue")
+ private String clusterUrl;
+
+ @ApiModelProperty(value = "Name of responsible person, separated by commas")
+ private String inCharges;
+
+ @ApiModelProperty(value = "Consume status")
+ private Integer status;
+
+ @ApiModelProperty(value = "Name of creator")
+ private String creator;
+
+ @ApiModelProperty(value = "Name of modifier")
+ private String modifier;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
+ @ApiModelProperty(value = "Version number")
+ private Integer version;
+
+ public abstract InlongConsumeRequest genRequest();
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java
new file mode 100644
index 000000000..a8f73552e
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.pojo.common.PageRequest;
+
+import java.util.List;
+
+/**
+ * Inlong consume query request
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong consume query request")
+public class InlongConsumePageRequest extends PageRequest {
+
+ @ApiModelProperty(value = "Keyword, can be consumer group")
+ private String keyword;
+
+ @ApiModelProperty(value = "Consumer group name")
+ private String consumerGroup;
+
+ @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+ private String mqType;
+
+ @ApiModelProperty(value = "The target topic of inlong consume")
+ private String topic;
+
+ @ApiModelProperty(value = "The target inlongGroupId of inlong consume")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Consume status")
+ private Integer status;
+
+ @ApiModelProperty(value = "Consume status list")
+ private List<Integer> statusList;
+
+ @ApiModelProperty(value = "Current user", hidden = true)
+ private String currentUser;
+
+ @ApiModelProperty(value = "Whether the current user is in the administrator role", hidden = true)
+ private Boolean isAdminRole;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
new file mode 100644
index 000000000..5dfa0bcc1
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base inlong consume request
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Base inlong consume request")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
+public abstract class InlongConsumeRequest {
+
+ @NotNull(groups = UpdateValidation.class)
+ @ApiModelProperty(value = "Primary key")
+ private Integer id;
+
+ @NotBlank(message = "consumerGroup cannot be null")
+ @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]")
+ private String consumerGroup;
+
+ @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+ private String mqType;
+
+ @ApiModelProperty(value = "The target topic of this consume")
+ private String topic;
+
+ @NotBlank(message = "inlong group id cannot be null")
+ @ApiModelProperty(value = "The target inlong group id of this consume")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Whether to filter consumption, 0-not filter, 1-filter")
+ private Integer filterEnabled = 0;
+
+ @ApiModelProperty(value = "The target inlong stream id of this consume, needed if the filterEnabled=1")
+ private String inlongStreamId;
+
+ @NotBlank(message = "inCharges cannot be null")
+ @ApiModelProperty(value = "Name of responsible person, separated by commas")
+ private String inCharges;
+
+ @ApiModelProperty(value = "Version number")
+ private Integer version;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
new file mode 100644
index 000000000..5e6388922
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.pulsar;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong group dto of Pulsar
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong group dto of Pulsar")
+public class ConsumePulsarDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
+ private Integer isDlq;
+
+ @ApiModelProperty("The name of the dead letter queue Topic")
+ private String deadLetterTopic;
+
+ @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure")
+ private Integer isRlq;
+
+ @ApiModelProperty("The name of the retry letter queue topic")
+ private String retryLetterTopic;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static ConsumePulsarDTO getFromRequest(ConsumePulsarRequest request) {
+ return ConsumePulsarDTO.builder()
+ .isDlq(request.getIsDlq())
+ .deadLetterTopic(request.getDeadLetterTopic())
+ .isRlq(request.getIsRlq())
+ .retryLetterTopic(request.getRetryLetterTopic())
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static ConsumePulsarDTO getFromJson(@NotNull String extParams) {
+ try {
+ return OBJECT_MAPPER.readValue(extParams, ConsumePulsarDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java
new file mode 100644
index 000000000..9a1f07324
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.pulsar;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+
+/**
+ * Inlong consume info of Pulsar
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.PULSAR)
+@ApiModel("Inlong consume info of Pulsar")
+public class ConsumePulsarInfo extends InlongConsumeInfo {
+
+ @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
+ private Integer isDlq;
+
+ @ApiModelProperty("The name of the dead letter queue Topic")
+ private String deadLetterTopic;
+
+ @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure")
+ private Integer isRlq;
+
+ @ApiModelProperty("The name of the retry letter queue topic")
+ private String retryLetterTopic;
+
+ @Override
+ public ConsumePulsarRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, ConsumePulsarRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java
new file mode 100644
index 000000000..f125fd461
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.pulsar;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+
+/**
+ * Inlong consume request of Pulsar
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Inlong consume request of Pulsar")
+@JsonTypeDefine(value = MQType.PULSAR)
+public class ConsumePulsarRequest extends InlongConsumeRequest {
+
+ @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
+ private Integer isDlq;
+
+ @ApiModelProperty("The name of the dead letter queue Topic")
+ private String deadLetterTopic;
+
+ @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure")
+ private Integer isRlq;
+
+ @ApiModelProperty("The name of the retry letter queue topic")
+ private String retryLetterTopic;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
new file mode 100644
index 000000000..6b0b313d5
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.tubemq;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong group info of TubeMQ
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@ApiModel("Inlong group info of TubeMQ")
+public class ConsumeTubeMQDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ // no fields
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static ConsumeTubeMQDTO getFromRequest(ConsumeTubeMQRequest request) {
+ return ConsumeTubeMQDTO.builder().build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static ConsumeTubeMQDTO getFromJson(@NotNull String extParams) {
+ try {
+ return OBJECT_MAPPER.readValue(extParams, ConsumeTubeMQDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java
similarity index 53%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java
index 4b7bdff63..3c2d4ec52 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java
@@ -15,20 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume.tubemq;
import io.swagger.annotations.ApiModel;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
/**
- * Inlong group info for TubeMQ
+ * Inlong consume info of TubeMQ
*/
@Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.TUBEMQ)
+@ApiModel("Inlong consume info of TubeMQ")
+public class ConsumeTubeMQInfo extends InlongConsumeInfo {
- // no field
+ // no fields
+
+ @Override
+ public ConsumeTubeMQRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, ConsumeTubeMQRequest::new);
+ }
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java
index 4b7bdff63..5e051a593 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java
@@ -15,20 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume.tubemq;
import io.swagger.annotations.ApiModel;
import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
/**
- * Inlong group info for TubeMQ
+ * Inlong consume request of TubeMQ
*/
@Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.TUBEMQ)
+@ApiModel("Inlong consume request of TubeMQ")
+public class ConsumeTubeMQRequest extends InlongConsumeRequest {
- // no field
+ // no fields
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 3c07a9c79..b01595500 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -98,9 +98,6 @@ public abstract class InlongGroupRequest {
@ApiModelProperty(value = "Name of followers, separated by commas")
private String followers;
- @ApiModelProperty(value = "Name of creator")
- private String creator;
-
@ApiModelProperty(value = "Inlong group Extension properties")
private List<InlongGroupExtInfo> extList;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
index 4b7bdff63..01f9ad348 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
@@ -29,6 +29,6 @@ import lombok.NoArgsConstructor;
@ApiModel("Inlong group info for TubeMQ")
public class InlongTubeMQDTO {
- // no field
+ // no fields
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
new file mode 100644
index 000000000..e9f886909
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operator of inlong consume.
+ */
+public abstract class AbstractConsumeOperator implements InlongConsumeOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumeOperator.class);
+
+ @Autowired
+ private InlongConsumeEntityMapper consumeMapper;
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public Integer saveOpt(InlongConsumeRequest request, String operator) {
+ // firstly check the topic info
+ this.checkTopicInfo(request);
+
+ // set the ext params, init status, and other info
+ InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new);
+ this.setTargetEntity(request, entity);
+ entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+
+ consumeMapper.insert(entity);
+ return entity.getId();
+ }
+
+ /**
+ * Set the parameters of the target entity.
+ *
+ * @param request inlong consume request
+ * @param targetEntity targetEntity which will set the new parameters
+ */
+ protected abstract void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity);
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+ public void updateOpt(InlongConsumeRequest request, String operator) {
+ // get the entity from request
+ InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new);
+ // set the ext params
+ this.setTargetEntity(request, entity);
+ entity.setModifier(operator);
+
+ int rowCount = consumeMapper.updateByIdSelective(entity);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("inlong consume has already updated with id={}, expire version={}",
+ request.getId(), request.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
new file mode 100644
index 000000000..20d697fe0
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Inlong consume operator for Pulsar.
+ */
+@Service
+public class ConsumePulsarOperator extends AbstractConsumeOperator {
+
+ private static final int DLQ_RLQ_ENABLE = 1;
+ private static final int DLQ__RLQ_DISABLE = 0;
+ // Topic prefix for the dead letter queue
+ private static final String PREFIX_DLQ = "dlq";
+ // Topic prefix for the retry letter queue
+ private static final String PREFIX_RLQ = "rlq";
+
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private InlongClusterService clusterService;
+ @Autowired
+ private InlongStreamService streamService;
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String mqType) {
+ return getMQType().equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType);
+ }
+
+ @Override
+ public String getMQType() {
+ return MQType.PULSAR;
+ }
+
+ @Override
+ public void checkTopicInfo(InlongConsumeRequest request) {
+ // one inlong stream only has one Pulsar topic,
+ // one inlong group may have multiple Pulsar topics.
+ String groupId = request.getInlongGroupId();
+ String originTopic = request.getTopic();
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, originTopic);
+ Preconditions.checkNotNull(streamEntity, "not found any Pulsar topic for inlong group " + groupId);
+
+ // format the topic to 'tenant/namespace/topic'
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
+ groupEntity.getInlongClusterTag(), null, ClusterType.PULSAR);
+ String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
+ ? InlongConstants.DEFAULT_PULSAR_TENANT
+ : pulsarCluster.getTenant();
+
+ request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
+ groupEntity.getMqResource(), originTopic));
+ }
+
+ @Override
+ public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
+ Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
+
+ ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo();
+ CommonBeanUtils.copyProperties(entity, consumeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ ConsumePulsarDTO dto = ConsumePulsarDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, consumeInfo);
+ }
+
+ return consumeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) {
+ // prerequisite for RLQ to be turned on: DLQ must be turned on.
+ // it means, if DLQ is closed, RLQ cannot exist alone and must be closed.
+ ConsumePulsarRequest pulsarRequest = (ConsumePulsarRequest) request;
+ boolean dlqEnable = (DLQ_RLQ_ENABLE == pulsarRequest.getIsDlq());
+ boolean rlqEnable = (DLQ_RLQ_ENABLE == pulsarRequest.getIsRlq());
+ if (rlqEnable && !dlqEnable) {
+ throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
+ }
+
+ // when saving, the DLQ / RLQ under the same groupId cannot be repeated.
+ // when updating, delete the related DLQ / RLQ info.
+ String groupId = targetEntity.getInlongGroupId();
+ if (dlqEnable) {
+ String dlqTopic = PREFIX_DLQ + "_" + pulsarRequest.getDeadLetterTopic();
+ Preconditions.checkTrue(!streamService.exist(groupId, dlqTopic),
+ ErrorCodeEnum.PULSAR_DLQ_DUPLICATED.getMessage());
+ } else {
+ pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
+ pulsarRequest.setDeadLetterTopic(null);
+ }
+ if (rlqEnable) {
+ String rlqTopic = PREFIX_RLQ + "_" + pulsarRequest.getRetryLetterTopic();
+ Preconditions.checkTrue(!streamService.exist(groupId, rlqTopic),
+ ErrorCodeEnum.PULSAR_RLQ_DUPLICATED.getMessage());
+ } else {
+ pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
+ pulsarRequest.setRetryLetterTopic(null);
+ }
+
+ try {
+ targetEntity.setExtParams(objectMapper.writeValueAsString(ConsumePulsarDTO.getFromRequest(pulsarRequest)));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CONSUME_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
new file mode 100644
index 000000000..d507c2d2c
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
+import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Objects;
+
+/**
+ * Inlong consume operator for TubeMQ.
+ */
+@Service
+public class ConsumeTubeMQOperator extends AbstractConsumeOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeTubeMQOperator.class);
+
+ @Autowired
+ private InlongGroupService groupService;
+
+ @Override
+ public Boolean accept(String mqType) {
+ return getMQType().equals(mqType);
+ }
+
+ @Override
+ public String getMQType() {
+ return MQType.TUBEMQ;
+ }
+
+ @Override
+ public void checkTopicInfo(InlongConsumeRequest request) {
+ String groupId = request.getInlongGroupId();
+ InlongGroupTopicInfo topicInfo = groupService.getTopic(groupId);
+ Preconditions.checkNotNull(topicInfo, "inlong group not exist: " + groupId);
+
+ // one inlong group only has one TubeMQ topic
+ String mqResource = topicInfo.getMqResource();
+ Preconditions.checkTrue(Objects.equals(mqResource, request.getTopic()),
+ String.format("inlong consume topic %s not belongs to inlong group %s", request.getTopic(), groupId));
+ }
+
+ @Override
+ public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
+ Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
+
+ ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo();
+ CommonBeanUtils.copyProperties(entity, consumeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ ConsumeTubeMQDTO dto = ConsumeTubeMQDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, consumeInfo);
+ }
+
+ return consumeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) {
+ LOGGER.info("do nothing for inlong consume with TubeMQ");
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java
new file mode 100644
index 000000000..52b51c892
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+
+/**
+ * Interface of the inlong consume operator.
+ */
+public interface InlongConsumeOperator {
+
+ /**
+ * Determines whether the current instance matches the specified type.
+ */
+ Boolean accept(String mqType);
+
+ /**
+ * Get the MQ type.
+ *
+ * @return MQ type string
+ */
+ String getMQType();
+
+ /**
+ * Check whether the topic in inlong consume belongs to its inlong group id.
+ *
+ * @param request inlong consume request
+ */
+ void checkTopicInfo(InlongConsumeRequest request);
+
+ /**
+ * Save the inlong consume info.
+ *
+ * @param request request of the group
+ * @param operator name of the operator
+ * @return inlong consume id after saving
+ */
+ Integer saveOpt(InlongConsumeRequest request, String operator);
+
+ /**
+ * Get the inlong consume info from the given entity.
+ *
+ * @param entity get field value from the entity
+ * @return inlong consume info after encapsulating
+ */
+ InlongConsumeInfo getFromEntity(InlongConsumeEntity entity);
+
+ /**
+ * Update the inlong consume info.
+ *
+ * @param request request of update
+ * @param operator name of operator
+ */
+ void updateOpt(InlongConsumeRequest request, String operator);
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
new file mode 100644
index 000000000..1a4a57482
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InlongConsumeOperator}.
+ */
+@Service
+public class InlongConsumeOperatorFactory {
+
+ @Autowired
+ private List<InlongConsumeOperator> consumeOperatorList;
+
+ /**
+ * Get a consumption operator instance via the given mqType
+ */
+ public InlongConsumeOperator getInstance(String mqType) {
+ return consumeOperatorList.stream()
+ .filter(inst -> inst.accept(mqType))
+ .findFirst()
+ .orElseThrow(() -> new
+ BusinessException(String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType)));
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
similarity index 82%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
index 0e413701f..d4be887e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.consume;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -34,9 +34,12 @@ import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+/**
+ * Operation to the inlong consume process.
+ */
@Slf4j
@Service
-public class ConsumptionProcessService {
+public class InlongConsumeProcessService {
@Autowired
private ConsumptionService consumptionService;
@@ -45,13 +48,20 @@ public class ConsumptionProcessService {
@Autowired
private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
+ /**
+ * Start the process for the specified ID.
+ *
+ * @param id inlong consume id
+ * @param operator name of operator
+ * @return workflow result
+ */
public WorkflowResult startProcess(Integer id, String operator) {
ConsumptionInfo consumptionInfo = consumptionService.get(id);
- Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS.contains(
- ConsumptionStatus.fromStatus(consumptionInfo.getStatus())),
- "current status not allow start workflow");
+ Preconditions.checkTrue(ConsumeStatus.ALLOW_START_WORKFLOW_STATUS.contains(
+ ConsumeStatus.fromStatus(consumptionInfo.getStatus())),
+ "current status not allowed to start workflow");
- consumptionInfo.setStatus(ConsumptionStatus.WAIT_APPROVE.getStatus());
+ consumptionInfo.setStatus(ConsumeStatus.WAIT_APPROVE.getCode());
boolean rowCount = consumptionService.update(consumptionInfo, operator);
Preconditions.checkTrue(rowCount, "update consumption failed");
@@ -72,4 +82,5 @@ public class ConsumptionProcessService {
form.setConsumptionInfo(consumptionInfo);
return form;
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
new file mode 100644
index 000000000..ce421e337
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong consume service layer interface
+ */
+public interface InlongConsumeService {
+
+ /**
+ * Save inlong consume info.
+ *
+ * @param request consume request need to save
+ * @param operator name of operator
+ * @return inlong consume id after saving
+ */
+ Integer save(InlongConsumeRequest request, String operator);
+
+ /**
+ * Get inlong consume info based on ID
+ *
+ * @param id inlong consume id
+ * @return detail of inlong group
+ */
+ InlongConsumeInfo get(Integer id);
+
+ /**
+ * Check whether the consumer group exists or not
+ *
+ * @param consumerGroup consumer group
+ * @param excludeSelfId exclude the ID of this record
+ * @return true if exists, false if not exists
+ */
+ boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId);
+
+ /**
+ * Paging query inlong consume info list
+ *
+ * @param request pagination query request
+ * @return inlong consume list
+ */
+ PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request);
+
+ /**
+ * Query the inlong consume statistics info via the username
+ *
+ * @param username username
+ * @return inlong consume status statistics
+ */
+ InlongConsumeCountInfo countStatus(String username);
+
+ /**
+ * Update the inlong consume
+ *
+ * @param request inlong consume request that needs to be updated
+ * @param operator name of operator
+ * @return whether succeed
+ */
+ Boolean update(@Valid @NotNull(message = "inlong consume request cannot be null") InlongConsumeRequest request,
+ String operator);
+
+ /**
+ * Delete the inlong consume by the id
+ *
+ * @param id inlong consume id that needs to be deleted
+ * @param operator name of operator
+ * @return whether succeed
+ */
+ Boolean delete(Integer id, String operator);
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
new file mode 100644
index 000000000..c7977b500
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
+import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
+
+/**
+ * Inlong consume service layer implementation
+ */
+@Service
+public class InlongConsumeServiceImpl implements InlongConsumeService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
+
+ @Autowired
+ private InlongConsumeEntityMapper consumeMapper;
+ @Autowired
+ private InlongConsumeOperatorFactory consumeOperatorFactory;
+
+ @Override
+ public Integer save(InlongConsumeRequest request, String operator) {
+ LOGGER.debug("begin to save inlong consume={} by user={}", request, operator);
+ Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+ Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null");
+ String consumerGroup = request.getConsumerGroup();
+ Preconditions.checkNotNull(consumerGroup, "inlong consume topic cannot be null");
+ if (consumerGroupExists(consumerGroup, request.getId())) {
+ throw new BusinessException(String.format("consumer group %s already exist", consumerGroup));
+ }
+
+ InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType());
+ consumeOperator.saveOpt(request, operator);
+
+ LOGGER.info("success to save inlong consume for consumer group={} by user={}", consumerGroup, operator);
+ return request.getId();
+ }
+
+ @Override
+ public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId) {
+ InlongConsumePageRequest request = InlongConsumePageRequest.builder()
+ .consumerGroup(consumerGroup)
+ .isAdminRole(true)
+ .build();
+ List<InlongConsumeEntity> result = consumeMapper.selectByCondition(request);
+ if (excludeSelfId != null) {
+ result = result.stream()
+ .filter(consumer -> !excludeSelfId.equals(consumer.getId()))
+ .collect(Collectors.toList());
+ }
+ return CollectionUtils.isNotEmpty(result);
+ }
+
+ @Override
+ public InlongConsumeInfo get(Integer id) {
+ Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+ InlongConsumeEntity entity = consumeMapper.selectById(id);
+ if (entity == null) {
+ LOGGER.error("inlong consume not found with id={}", id);
+ throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
+ }
+
+ InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(entity.getMqType());
+ InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity);
+
+ LOGGER.debug("success to get inlong consume for id={}", id);
+ return consumeInfo;
+ }
+
+ @Override
+ public InlongConsumeCountInfo countStatus(String username) {
+ List<Map<String, Object>> statusCount = consumeMapper.countByUser(username);
+ InlongConsumeCountInfo countInfo = new InlongConsumeCountInfo();
+ for (Map<String, Object> map : statusCount) {
+ int status = (Integer) map.get("status");
+ long count = (Long) map.get("count");
+ countInfo.setTotalCount(countInfo.getTotalCount() + count);
+ if (status == ConsumeStatus.WAIT_ASSIGN.getCode()) {
+ countInfo.setWaitAssignCount(countInfo.getWaitAssignCount() + count);
+ } else if (status == ConsumeStatus.WAIT_APPROVE.getCode()) {
+ countInfo.setWaitApproveCount(countInfo.getWaitApproveCount() + count);
+ } else if (status == ConsumeStatus.REJECTED.getCode()) {
+ countInfo.setRejectCount(countInfo.getRejectCount() + count);
+ }
+ }
+
+ LOGGER.debug("success to count inlong consume for user={}", username);
+ return countInfo;
+ }
+
+ @Override
+ public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) {
+ if (request.getPageSize() > MAX_PAGE_SIZE) {
+ LOGGER.warn("list inlong consumes, change page size from {} to {}", request.getPageSize(), MAX_PAGE_SIZE);
+ request.setPageSize(MAX_PAGE_SIZE);
+ }
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ OrderFieldEnum.checkOrderField(request);
+ OrderTypeEnum.checkOrderType(request);
+ Page<InlongConsumeEntity> entityPage = (Page<InlongConsumeEntity>) consumeMapper.selectByCondition(request);
+ List<InlongConsumeBriefInfo> briefInfos = CommonBeanUtils.copyListProperties(entityPage,
+ InlongConsumeBriefInfo::new);
+ PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(briefInfos,
+ entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
+
+ LOGGER.debug("success to list inlong consume for {}", request);
+ return pageResult;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
+ propagation = Propagation.REQUIRES_NEW)
+ public Boolean update(InlongConsumeRequest request, String operator) {
+ LOGGER.debug("begin to update inlong consume={} by user={}", request, operator);
+ Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+
+ // check if it can be modified
+ Integer consumeId = request.getId();
+ InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
+ Preconditions.checkNotNull(existEntity, "inlong consume not exist with id " + consumeId);
+ Preconditions.checkTrue(existEntity.getInCharges().contains(operator),
+ "operator" + operator + " has no privilege for the inlong consume");
+
+ if (!Objects.equals(existEntity.getVersion(), request.getVersion())) {
+ LOGGER.error(String.format("inlong consume has already updated, id=%s, curVersion=%s",
+ existEntity.getId(), request.getVersion()));
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+
+ ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(existEntity.getStatus());
+ Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+ "inlong consume not allowed update when status is " + consumeStatus.name());
+
+ InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType());
+ consumeOperator.updateOpt(request, operator);
+
+ LOGGER.info("success to update inlong consume={} by user={}", request, operator);
+ return true;
+ }
+
+ @Override
+ public Boolean delete(Integer id, String operator) {
+ LOGGER.info("begin to delete inlong consume for id={} by user={}", id, operator);
+ Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+ InlongConsumeEntity entity = consumeMapper.selectById(id);
+ Preconditions.checkNotNull(entity, "inlong consume not exist with id " + id);
+
+ entity.setIsDeleted(id);
+ entity.setStatus(ConsumeStatus.DELETED.getCode());
+ entity.setModifier(operator);
+
+ int rowCount = consumeMapper.updateByIdSelective(entity);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("inlong consume has already updated with id={}, curVersion={}", id, entity.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+
+ LOGGER.info("success to delete inlong consume for id={} by user={}", id, operator);
+ return true;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 8a899a390..5641a293c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -104,9 +104,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
return ConsumptionSummary.builder()
.totalCount(countMap.values().stream().mapToInt(c -> c).sum())
- .waitingAssignCount(countMap.getOrDefault(ConsumptionStatus.WAIT_ASSIGN.getStatus() + "", 0))
- .waitingApproveCount(countMap.getOrDefault(ConsumptionStatus.WAIT_APPROVE.getStatus() + "", 0))
- .rejectedCount(countMap.getOrDefault(ConsumptionStatus.REJECTED.getStatus() + "", 0)).build();
+ .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.WAIT_ASSIGN.getCode() + "", 0))
+ .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.WAIT_APPROVE.getCode() + "", 0))
+ .rejectedCount(countMap.getOrDefault(ConsumeStatus.REJECTED.getCode() + "", 0)).build();
}
@Override
@@ -166,9 +166,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
if (info.getId() != null) {
ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
- ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(consumptionEntity.getStatus());
- Preconditions.checkTrue(ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus),
- "consumption not allow update when status is " + consumptionStatus.name());
+ ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(consumptionEntity.getStatus());
+ Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+ "consumption not allow update when status is " + consumeStatus.name());
}
setTopicInfo(info);
@@ -284,7 +284,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
// If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
// add/remove inlong streams in the inlong group
- if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus()) {
+ if (ConsumeStatus.APPROVED.getCode() == exists.getStatus()) {
String groupId = info.getInlongGroupId();
String dlqNameOld = pulsarEntity.getDeadLetterTopic();
String dlqNameNew = update.getDeadLetterTopic();
@@ -356,7 +356,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
entity.setInCharges(groupInfo.getInCharges());
entity.setFilterEnabled(0);
- entity.setStatus(ConsumptionStatus.APPROVED.getStatus());
+ entity.setStatus(ConsumeStatus.APPROVED.getCode());
String operator = groupInfo.getCreator();
entity.setCreator(operator);
entity.setModifier(operator);
@@ -377,7 +377,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
- entity.setStatus(ConsumptionStatus.WAIT_ASSIGN.getStatus());
+ entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index 0132b6259..8d950b706 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -49,7 +49,7 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
/**
- * Operation related to inlong group process
+ * Operation to the inlong group process
*/
@Service
public class InlongGroupProcessService {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d9c5f1c63..150c8e317 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -77,6 +77,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
+
/**
* Inlong group service layer implementation
*/
@@ -85,7 +87,6 @@ import java.util.stream.Collectors;
public class InlongGroupServiceImpl implements InlongGroupService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class);
- private static final Integer MAX_PAGE_SIZE = 100;
@Autowired
private InlongGroupOperatorFactory groupOperatorFactory;
@@ -184,7 +185,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) {
if (request.getPageSize() > MAX_PAGE_SIZE) {
- LOGGER.warn("list group info, but page size is {}, change to {}", request.getPageSize(), MAX_PAGE_SIZE);
+ LOGGER.warn("list inlong groups, change page size from {} to {}", request.getPageSize(), MAX_PAGE_SIZE);
request.setPageSize(MAX_PAGE_SIZE);
}
PageHelper.startPage(request.getPageNum(), request.getPageSize());
@@ -275,8 +276,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
return true;
}
- @Transactional(rollbackFor = Throwable.class)
@Override
+ @Transactional(rollbackFor = Throwable.class)
public boolean delete(String groupId, String operator) {
LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator);
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
index caded9717..c06c3d5a2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.listener.consumption;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -58,7 +58,7 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener {
ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
- update.setStatus(ConsumptionStatus.CANCELED.getStatus());
+ update.setStatus(ConsumeStatus.CANCELED.getCode());
int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
log.error("consumption information has already updated, id={}, curVersion={}",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
index c42930487..e9d0795b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -104,7 +104,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
*/
private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
ConsumptionEntity update = consumptionMapper.selectByPrimaryKey(consumptionId);
- update.setStatus(ConsumptionStatus.APPROVED.getStatus());
+ update.setStatus(ConsumeStatus.APPROVED.getCode());
update.setConsumerGroup(consumerGroup);
int rowCount = consumptionMapper.updateByPrimaryKeySelective(update);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
index 72af56894..b1e5030e7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.listener.consumption;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -60,7 +60,7 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener {
ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
- update.setStatus(ConsumptionStatus.REJECTED.getStatus());
+ update.setStatus(ConsumeStatus.REJECTED.getCode());
int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index ea5fdca22..d844285b7 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -140,6 +140,33 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
);
+-- ----------------------------
+-- Table structure for inlong_consume
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_consume`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group name, filled in by the user, undeleted ones cannot be repeated',
+ `description` varchar(256) DEFAULT '' COMMENT 'Inlong consume description',
+ `mq_type` varchar(10) DEFAULT 'TUBEMQ' COMMENT 'Message queue type, high throughput: TUBEMQ, high consistency: PULSAR',
+ `topic` varchar(256) NOT NULL COMMENT 'The target topic of this consume',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'The target inlong group id of this consume',
+ `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter consume, 0: not filter, 1: filter',
+ `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'The target inlong stream id of this consume, needed if the filter_enabled=1',
+ `ext_params` mediumtext DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `status` int(4) DEFAULT '100' COMMENT 'Inlong consume status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_inlong_consume` (`consumer_group`, `is_deleted`)
+);
+
+
-- ----------------------------
-- Table structure for data_node
-- ----------------------------
@@ -166,7 +193,7 @@ CREATE TABLE IF NOT EXISTS `data_node`
);
-- ----------------------------
--- Table structure for consumption
+-- Deprecated: Table structure for consumption
-- ----------------------------
CREATE TABLE IF NOT EXISTS `consumption`
(
@@ -189,7 +216,7 @@ CREATE TABLE IF NOT EXISTS `consumption`
);
-- ----------------------------
--- Table structure for consumption_pulsar
+-- Deprecated: Table structure for consumption_pulsar
-- ----------------------------
CREATE TABLE IF NOT EXISTS `consumption_pulsar`
(
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 057af951f..cfa64e471 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -151,6 +151,33 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table';
+-- ----------------------------
+-- Table structure for inlong_consume
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_consume`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group name, filled in by the user, undeleted ones cannot be repeated',
+ `description` varchar(256) DEFAULT '' COMMENT 'Inlong consume description',
+ `mq_type` varchar(10) DEFAULT 'TUBEMQ' COMMENT 'Message queue type, high throughput: TUBEMQ, high consistency: PULSAR',
+ `topic` varchar(256) NOT NULL COMMENT 'The target topic of this consume',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'The target inlong group id of this consume',
+ `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter consume, 0: not filter, 1: filter',
+ `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'The target inlong stream id of this consume, needed if the filter_enabled=1',
+ `ext_params` mediumtext DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `status` int(4) DEFAULT '100' COMMENT 'Inlong consume status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_inlong_consume` (`consumer_group`, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong consume table';
+
-- ----------------------------
-- Table structure for data_node
-- ----------------------------
@@ -178,7 +205,7 @@ CREATE TABLE IF NOT EXISTS `data_node`
DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table';
-- ----------------------------
--- Table structure for consumption
+-- Deprecated: Table structure for consumption
-- ----------------------------
CREATE TABLE IF NOT EXISTS `consumption`
(
@@ -202,7 +229,7 @@ CREATE TABLE IF NOT EXISTS `consumption`
DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
-- ----------------------------
--- Table structure for consumption_pulsar
+-- Deprecated: Table structure for consumption_pulsar
-- ----------------------------
CREATE TABLE IF NOT EXISTS `consumption_pulsar`
(
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
index 7207ce726..f1c77ad51 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.core.impl.ConsumptionProcessService;
+import org.apache.inlong.manager.service.consume.InlongConsumeProcessService;
import org.apache.inlong.manager.service.operationlog.OperationLog;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -53,7 +53,7 @@ public class ConsumptionController {
@Autowired
private ConsumptionService consumptionService;
@Autowired
- private ConsumptionProcessService processOperation;
+ private InlongConsumeProcessService processOperation;
@GetMapping("/consumption/summary")
@ApiOperation(value = "Get data consumption summary")
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
new file mode 100644
index 000000000..4697cbe57
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.enums.UserTypeEnum;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.service.consume.InlongConsumeProcessService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.user.LoginUserUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Inlong consume control layer
+ */
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Consume-API")
+public class InlongConsumeController {
+
+ @Autowired
+ private InlongConsumeService consumeService;
+ @Autowired
+ private InlongConsumeProcessService consumeProcessService;
+
+ @RequestMapping(value = "/consume/save", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE)
+ @ApiOperation(value = "Save inlong consume")
+ public Response<Integer> save(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return Response.success(consumeService.save(request, operator));
+ }
+
+ @GetMapping("/consume/get/{id}")
+ @ApiOperation(value = "Get inlong consume")
+ @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
+ public Response<InlongConsumeInfo> get(@PathVariable(name = "id") Integer id) {
+ return Response.success(consumeService.get(id));
+ }
+
+ @GetMapping(value = "/consume/countStatus")
+ @ApiOperation(value = "Count inlong consume status by current user")
+ public Response<InlongConsumeCountInfo> countStatusByUser() {
+ return Response.success(consumeService.countStatus(LoginUserUtils.getLoginUser().getName()));
+ }
+
+ @GetMapping("/consume/list")
+ @ApiOperation(value = "List inlong consume by pagination")
+ public Response<PageResult<InlongConsumeBriefInfo>> list(InlongConsumePageRequest request) {
+ request.setCurrentUser(LoginUserUtils.getLoginUser().getName());
+ request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserTypeEnum.ADMIN.name()));
+ return Response.success(consumeService.list(request));
+ }
+
+ @PostMapping("/consume/update/{id}")
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Update inlong consume")
+ public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) {
+ return Response.success(consumeService.update(request, LoginUserUtils.getLoginUser().getName()));
+ }
+
+ @DeleteMapping("/consume/delete/{id}")
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiOperation(value = "Delete inlong consume by ID")
+ @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
+ public Response<Object> delete(@PathVariable(name = "id") Integer id) {
+ this.consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
+ return Response.success();
+ }
+
+ @PostMapping("/consume/startProcess/{id}")
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Start inlong consume process")
+ @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
+ public Response<WorkflowResult> startProcess(@PathVariable(name = "id") Integer id) {
+ String username = LoginUserUtils.getLoginUser().getName();
+ return Response.success(consumeProcessService.startProcess(id, username));
+ }
+
+}