You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/29 11:58:45 UTC
[inlong] branch master updated: [INLONG-6085][Manager] Remove the unused data consumption related classes (#6314)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 32e017abc [INLONG-6085][Manager] Remove the unused data consumption related classes (#6314)
32e017abc is described below
commit 32e017abcc556407ce3314711656edf5ab78efa0
Author: healchow <he...@gmail.com>
AuthorDate: Sat Oct 29 19:58:41 2022 +0800
[INLONG-6085][Manager] Remove the unused data consumption related classes (#6314)
---
.../manager/dao/entity/ConsumptionEntity.java | 48 ---
.../dao/entity/ConsumptionPulsarEntity.java | 41 ---
.../dao/mapper/ConsumptionEntityMapper.java | 50 ---
.../dao/mapper/ConsumptionPulsarEntityMapper.java | 43 ---
.../dao/mapper/InlongConsumeEntityMapper.java | 7 +-
.../resources/mappers/ConsumptionEntityMapper.xml | 311 ----------------
.../mappers/ConsumptionPulsarEntityMapper.xml | 181 ----------
.../mappers/InlongConsumeEntityMapper.xml | 62 +++-
.../manager/pojo/consumption/ConsumptionInfo.java | 101 ------
.../pojo/consumption/ConsumptionListVo.java | 66 ----
.../pojo/consumption/ConsumptionMqExtBase.java | 56 ---
.../pojo/consumption/ConsumptionPulsarInfo.java | 52 ---
.../manager/pojo/consumption/ConsumptionQuery.java | 74 ----
.../pojo/consumption/ConsumptionSummary.java | 48 ---
.../form/process/ApplyConsumeProcessForm.java | 2 +-
.../service/consume/ConsumePulsarOperator.java | 36 +-
.../consume/InlongConsumeOperatorFactory.java | 7 +-
.../service/consume/InlongConsumeServiceImpl.java | 9 +-
.../manager/service/core/ConsumptionService.java | 89 -----
.../service/core/impl/ConsumptionServiceImpl.java | 399 ---------------------
.../service/group/InlongGroupOperator4Pulsar.java | 9 +
.../{core => }/heartbeat/HeartbeatManager.java | 4 +-
.../impl => heartbeat}/HeartbeatServiceImpl.java | 3 +-
.../apply/ApproveConsumeProcessListener.java | 6 +-
.../queue/pulsar/PulsarResourceOperator.java | 2 +-
.../service/cluster/InlongClusterServiceTest.java | 2 +-
.../core/heartbeat/HeartbeatManagerTest.java | 1 +
.../service/core/impl/ConsumptionServiceTest.java | 72 ----
.../main/resources/h2/apache_inlong_manager.sql | 41 +--
.../manager-web/sql/apache_inlong_manager.sql | 44 +--
.../web/controller/ConsumptionController.java | 115 ------
.../inlong/manager/workflow/WorkflowConfig.java | 100 ------
32 files changed, 103 insertions(+), 1978 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionEntity.java
deleted file mode 100644
index 329fd167b..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionEntity.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
-
-/**
- * Data consumption table
- */
-@Data
-public class ConsumptionEntity implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private Integer id;
- private String consumerGroup;
- private String inCharges;
- private String inlongGroupId;
- private String mqType;
- private String topic;
- private Integer filterEnabled;
- private String inlongStreamId;
- private Integer status;
- private String creator;
- private String modifier;
- private Date createTime;
- private Date modifyTime;
- private Integer isDeleted;
- private Integer version;
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
deleted file mode 100644
index 32e9ed129..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-/**
- * Data consumption table, including group name, group id, topic, etc
- */
-@Data
-public class ConsumptionPulsarEntity implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private Integer id;
- private Integer consumptionId;
- private String consumerGroup;
- private String inlongGroupId;
- private Integer isDlq;
- private String deadLetterTopic;
- private Integer isRlq;
- private String retryLetterTopic;
- private Integer isDeleted;
-
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
deleted file mode 100644
index 838488f96..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.pojo.common.CountInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.springframework.stereotype.Repository;
-
-import java.util.List;
-
-@Repository
-public interface ConsumptionEntityMapper {
-
- int deleteByPrimaryKey(Integer id);
-
- int insert(ConsumptionEntity record);
-
- int insertSelective(ConsumptionEntity record);
-
- ConsumptionEntity selectByPrimaryKey(Integer id);
-
- ConsumptionEntity selectConsumptionExists(@Param("groupId") String groupId, @Param("topic") String topic,
- @Param("consumerGroup") String consumerGroup);
-
- int updateByPrimaryKeySelective(ConsumptionEntity record);
-
- int updateByPrimaryKey(ConsumptionEntity record);
-
- List<ConsumptionEntity> listByQuery(ConsumptionQuery consumptionQuery);
-
- List<CountInfo> countByQuery(ConsumptionQuery consumptionQuery);
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
deleted file mode 100644
index 99f65d22c..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
-import org.springframework.stereotype.Repository;
-
-@Repository
-public interface ConsumptionPulsarEntityMapper {
-
- int deleteByConsumptionId(@Param("consumptionId") Integer consumptionId);
-
- int insert(ConsumptionPulsarEntity record);
-
- int insertSelective(ConsumptionPulsarEntity record);
-
- ConsumptionPulsarEntity selectByPrimaryKey(@Param("id") Integer id);
-
- ConsumptionPulsarEntity selectByConsumptionId(@Param("consumptionId") Integer consumptionId);
-
- int updateByPrimaryKeySelective(ConsumptionPulsarEntity record);
-
- int updateByConsumptionId(ConsumptionPulsarEntity record);
-
- int updateByPrimaryKey(ConsumptionPulsarEntity record);
-
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 6b0db6617..a6b6090a0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.pojo.common.CountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
import org.springframework.stereotype.Repository;
@@ -34,11 +35,13 @@ public interface InlongConsumeEntityMapper {
List<CountInfo> countByUser(@Param(value = "username") String username);
- List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
-
InlongConsumeEntity selectExists(@Param("consumerGroup") String consumerGroup, @Param("topic") String topic,
@Param("inlongGroupId") String inlongGroupId);
+ List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
+
+ List<InlongConsumeBriefInfo> selectBriefList(InlongConsumePageRequest request);
+
int updateById(InlongConsumeEntity record);
int updateByIdSelective(InlongConsumeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
deleted file mode 100644
index 9865ca0a1..000000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
+++ /dev/null
@@ -1,311 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
- <id column="id" jdbcType="INTEGER" property="id"/>
- <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/>
- <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
- <result column="mq_type" jdbcType="VARCHAR" property="mqType"/>
- <result column="topic" jdbcType="VARCHAR" property="topic"/>
- <result column="filter_enabled" jdbcType="INTEGER" property="filterEnabled"/>
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
- <result column="status" jdbcType="INTEGER" property="status"/>
- <result column="creator" jdbcType="VARCHAR" property="creator"/>
- <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
- <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
- <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
- <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
- <result column="version" jdbcType="INTEGER" property="version"/>
- </resultMap>
- <sql id="Base_Column_List">
- id, consumer_group, in_charges, inlong_group_id,
- mq_type, topic, filter_enabled, inlong_stream_id,
- status, is_deleted, creator, modifier, create_time, modify_time, version
- </sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer"
- resultMap="BaseResultMap">
- select
- <include refid="Base_Column_List"/>
- from consumption
- where id = #{id, jdbcType=INTEGER} and is_deleted = 0
- </select>
- <select id="selectConsumptionExists" resultType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
- select
- <include refid="Base_Column_List"/>
- from consumption
- where inlong_group_id = #{groupId, jdbcType=VARCHAR}
- and topic = #{topic, jdbcType=VARCHAR}
- and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
- and is_deleted = 0
- </select>
-
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- update consumption
- set is_deleted = id
- where id = #{id, jdbcType=INTEGER}
- </delete>
- <insert id="insert" useGeneratedKeys="true" keyProperty="id"
- parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
- insert into consumption (id, consumer_group, in_charges,
- inlong_group_id, mq_type, topic,
- filter_enabled, inlong_stream_id,
- status, creator, modifier)
- values (#{id, jdbcType=INTEGER}, #{consumerGroup, jdbcType=VARCHAR}, #{inCharges, jdbcType=VARCHAR},
- #{inlongGroupId, jdbcType=VARCHAR}, #{mqType, jdbcType=VARCHAR}, #{topic, jdbcType=VARCHAR},
- #{filterEnabled, jdbcType=INTEGER}, #{inlongStreamId, jdbcType=VARCHAR},
- #{status, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
- </insert>
- <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
- parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
- insert into consumption
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="consumerGroup != null">
- consumer_group,
- </if>
- <if test="inCharges != null">
- in_charges,
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id,
- </if>
- <if test="mqType != null">
- mq_type,
- </if>
- <if test="topic != null">
- topic,
- </if>
- <if test="filterEnabled != null">
- filter_enabled,
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id,
- </if>
- <if test="status != null">
- status,
- </if>
- <if test="creator != null">
- creator,
- </if>
- <if test="modifier != null">
- modifier,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id, jdbcType=INTEGER},
- </if>
- <if test="consumerGroup != null">
- #{consumerGroup, jdbcType=VARCHAR},
- </if>
- <if test="inCharges != null">
- #{inCharges, jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- #{inlongGroupId, jdbcType=VARCHAR},
- </if>
- <if test="mqType != null">
- #{mqType, jdbcType=VARCHAR},
- </if>
- <if test="topic != null">
- #{topic, jdbcType=VARCHAR},
- </if>
- <if test="filterEnabled != null">
- #{filterEnabled, jdbcType=INTEGER},
- </if>
- <if test="inlongStreamId != null">
- #{inlongStreamId, jdbcType=VARCHAR},
- </if>
- <if test="status != null">
- #{status, jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- #{creator, jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- #{modifier, jdbcType=VARCHAR},
- </if>
- </trim>
- </insert>
- <update id="updateByPrimaryKeySelective"
- parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
- update consumption
- <set>
- <if test="consumerGroup != null">
- consumer_group = #{consumerGroup, jdbcType=VARCHAR},
- </if>
- <if test="inCharges != null">
- in_charges = #{inCharges, jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
- </if>
- <if test="mqType != null">
- mq_type = #{mqType, jdbcType=VARCHAR},
- </if>
- <if test="topic != null">
- topic = #{topic, jdbcType=VARCHAR},
- </if>
- <if test="filterEnabled != null">
- filter_enabled = #{filterEnabled, jdbcType=INTEGER},
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
- </if>
- <if test="status != null">
- status = #{status, jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- creator = #{creator, jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- modifier = #{modifier, jdbcType=VARCHAR},
- </if>
- version = #{version, jdbcType=INTEGER} + 1
- </set>
- where id = #{id, jdbcType=INTEGER}
- and is_deleted = 0
- and version = #{version, jdbcType=INTEGER}
- </update>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
- update consumption
- set consumer_group = #{consumerGroup, jdbcType=VARCHAR},
- in_charges = #{inCharges, jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
- mq_type = #{mqType, jdbcType=VARCHAR},
- topic = #{topic, jdbcType=VARCHAR},
- filter_enabled = #{filterEnabled, jdbcType=INTEGER},
- inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
- status = #{status, jdbcType=INTEGER},
- creator = #{creator, jdbcType=VARCHAR},
- modifier = #{modifier, jdbcType=VARCHAR},
- is_deleted = #{isDeleted, jdbcType=INTEGER},
- version = #{version, jdbcType=INTEGER} + 1
- where id = #{id, jdbcType=INTEGER}
- and is_deleted = 0
- and version = #{version, jdbcType=INTEGER}
- </update>
-
- <select id="listByQuery"
- parameterType="org.apache.inlong.manager.pojo.consumption.ConsumptionQuery"
- resultMap="BaseResultMap">
- select
- c.*
- from consumption c
- where c.is_deleted=0
- <if test="consumerGroup != null and consumerGroup != ''">
- and c.consumer_group = #{consumerGroup, jdbcType=VARCHAR}
- </if>
- <if test="inlongGroupId != null and inlongGroupId != ''">
- and c.inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
- </if>
- <if test="mqType != null and mqType != ''">
- and c.mq_type = #{mqType, jdbcType=VARCHAR}
- </if>
- <if test="topic != null and topic != ''">
- and c.topic like CONCAT('%', #{topic}, '%')
- </if>
- <if test="filterEnabled != null">
- and c.filter_enabled = #{filterEnabled, jdbcType=INTEGER}
- </if>
- <if test="inlongStreamId != null and inlongStreamId != ''">
- and c.inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
- </if>
- <if test="status != null">
- and c.status = #{status, jdbcType=INTEGER}
- </if>
- <if test="creator != null and creator != ''">
- and c.creator = #{creator, jdbcType=VARCHAR}
- </if>
- <if test="modifier != null and modifier != ''">
- and c.modifier = #{modifier, jdbcType=VARCHAR}
- </if>
- <if test="isAdminRole == false">
- and (
- FIND_IN_SET(#{username, jdbcType=VARCHAR},c.in_charges)
- or c.creator = #{username, jdbcType=VARCHAR}
- )
- </if>
- <if test="keyword != null and keyword !=''">
- and (c.topic like CONCAT('%', #{keyword}, '%') or c.consumer_group like CONCAT('%', #{keyword}, '%'))
- </if>
- <if test="lastConsumptionStatus != null and lastConsumptionStatus != 3">
- and cas.latest_record_state = #{lastConsumptionStatus, jdbcType=INTEGER}
- </if>
- <if test="lastConsumptionStatus != null and lastConsumptionStatus == 3">
- and (cas.latest_record_state = #{lastConsumptionStatus, jdbcType=INTEGER}
- or cas.latest_record_state is null)
- </if>
- order by id desc
- </select>
-
- <select id="countByQuery"
- parameterType="org.apache.inlong.manager.pojo.consumption.ConsumptionQuery"
- resultType="org.apache.inlong.manager.pojo.common.CountInfo">
- select status as `key`, count(1) as `value`
- from consumption
- where is_deleted=0
- <if test="consumerGroup != null and consumerGroup != ''">
- and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
- </if>
- <if test="inCharges != null and inCharges != ''">
- and FIND_IN_SET(#{inCharges, jdbcType=VARCHAR},in_charges)
- </if>
- <if test="inlongGroupId != null and inlongGroupId != ''">
- and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
- </if>
- <if test="mqType != null and mqType != ''">
- and mq_type = #{mqType, jdbcType=VARCHAR}
- </if>
- <if test="topic != null and topic != ''">
- and topic = #{topic, jdbcType=VARCHAR}
- </if>
- <if test="filterEnabled != null">
- and filter_enabled = #{filterEnabled, jdbcType=INTEGER}
- </if>
- <if test="inlongStreamId != null and inlongStreamId != ''">
- and inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
- </if>
- <if test="status != null">
- and status = #{status, jdbcType=INTEGER}
- </if>
- <if test="creator != null and creator != ''">
- and creator = #{creator, jdbcType=VARCHAR}
- </if>
- <if test="modifier != null and modifier != ''">
- and modifier = #{modifier, jdbcType=VARCHAR}
- </if>
- <if test="username != null and username !=''">
- and (
- FIND_IN_SET(#{username, jdbcType=VARCHAR},in_charges)
- or creator = #{username, jdbcType=VARCHAR}
- )
- </if>
- <if test="keyword != null and keyword !=''">
- and ( topic like CONCAT('%', #{keyword}, '%') or consumer_group like CONCAT('%', #{keyword}, '%') )
- </if>
- group by status
- </select>
-</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
deleted file mode 100644
index f5eb73453..000000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
+++ /dev/null
@@ -1,181 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
- <id column="id" jdbcType="INTEGER" property="id"/>
- <result column="consumption_id" jdbcType="VARCHAR" property="consumptionId"/>
- <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/>
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
- <result column="is_rlq" jdbcType="INTEGER" property="isRlq"/>
- <result column="retry_letter_topic" jdbcType="VARCHAR" property="retryLetterTopic"/>
- <result column="is_dlq" jdbcType="INTEGER" property="isDlq"/>
- <result column="dead_letter_topic" jdbcType="VARCHAR" property="deadLetterTopic"/>
- <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
- </resultMap>
- <sql id="Base_Column_List">
- id, consumption_id, consumer_group, inlong_group_id, is_rlq, retry_letter_topic,
- is_dlq, dead_letter_topic, is_deleted
- </sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
- select
- <include refid="Base_Column_List"/>
- from consumption_pulsar
- where id = #{id,jdbcType=INTEGER}
- </select>
- <select id="selectByConsumptionId" parameterType="java.lang.Integer" resultMap="BaseResultMap">
- select
- <include refid="Base_Column_List"/>
- from consumption_pulsar
- where consumption_id = #{consumptionId, jdbcType=INTEGER}
- and is_deleted = 0
- </select>
-
- <delete id="deleteByConsumptionId">
- update consumption_pulsar
- set is_deleted = id
- where consumption_id = #{consumptionId, jdbcType=INTEGER}
- and is_deleted = 0
- </delete>
-
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
- insert into consumption_pulsar (id, consumption_id, consumer_group,
- inlong_group_id, is_rlq, retry_letter_topic,
- is_dlq, dead_letter_topic, is_deleted)
- values (#{id,jdbcType=INTEGER}, #{consumptionId,jdbcType=INTEGER}, #{consumerGroup,jdbcType=VARCHAR},
- #{inlongGroupId,jdbcType=VARCHAR}, #{isRlq,jdbcType=INTEGER}, #{retryLetterTopic,jdbcType=VARCHAR},
- #{isDlq,jdbcType=INTEGER}, #{deadLetterTopic,jdbcType=VARCHAR}, #{isDeleted,jdbcType=INTEGER})
- </insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
- insert into consumption_pulsar
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="consumptionId != null">
- consumption_id,
- </if>
- <if test="consumerGroup != null">
- consumer_group,
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id,
- </if>
- <if test="isRlq != null">
- is_rlq,
- </if>
- <if test="retryLetterTopic != null">
- retry_letter_topic,
- </if>
- <if test="isDlq != null">
- is_dlq,
- </if>
- <if test="deadLetterTopic != null">
- dead_letter_topic,
- </if>
- <if test="isDeleted != null">
- is_deleted,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="consumerGroup != null">
- #{consumerGroup,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="isRlq != null">
- #{isRlq,jdbcType=INTEGER},
- </if>
- <if test="retryLetterTopic != null">
- #{retryLetterTopic,jdbcType=VARCHAR},
- </if>
- <if test="isDlq != null">
- #{isDlq,jdbcType=INTEGER},
- </if>
- <if test="deadLetterTopic != null">
- #{deadLetterTopic,jdbcType=VARCHAR},
- </if>
- <if test="isDeleted != null">
- #{isDeleted,jdbcType=INTEGER},
- </if>
- </trim>
- </insert>
- <update id="updateByPrimaryKeySelective"
- parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
- update consumption_pulsar
- <set>
- <if test="consumptionId != null">
- consumption_id = #{consumptionId,jdbcType=INTEGER},
- </if>
- <if test="consumerGroup != null">
- consumer_group = #{consumerGroup,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="isRlq != null">
- is_rlq = #{isRlq,jdbcType=INTEGER},
- </if>
- <if test="retryLetterTopic != null">
- retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
- </if>
- <if test="isDlq != null">
- is_dlq = #{isDlq,jdbcType=INTEGER},
- </if>
- <if test="deadLetterTopic != null">
- dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
- </if>
- <if test="isDeleted != null">
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
- <update id="updateByConsumptionId">
- update consumption_pulsar
- <set>
- consumer_group = #{consumerGroup,jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- is_rlq = #{isRlq,jdbcType=INTEGER},
- retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
- is_dlq = #{isDlq,jdbcType=INTEGER},
- dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </set>
- where consumption_id = #{consumptionId,jdbcType=INTEGER}
- </update>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
- update consumption_pulsar
- set consumption_id = #{consumptionId,jdbcType=INTEGER},
- consumer_group = #{consumerGroup,jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- is_rlq = #{isRlq,jdbcType=INTEGER},
- retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
- is_dlq = #{isDlq,jdbcType=INTEGER},
- dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
- is_deleted = #{isDeleted,jdbcType=INTEGER}
- where id = #{id,jdbcType=INTEGER}
- </update>
-</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
index c38c53028..031d96c4c 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -74,6 +74,16 @@
and (creator = #{username, jdbcType=VARCHAR} or FIND_IN_SET(#{username, jdbcType=VARCHAR}, in_charges))
group by status
</select>
+ <select id="selectExists" resultType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_consume
+ where is_deleted = 0
+ and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+ and topic = #{topic, jdbcType=VARCHAR}
+ and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ limit 1
+ </select>
<select id="selectByCondition" resultMap="BaseResultMap"
parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest">
select
@@ -120,15 +130,51 @@
</otherwise>
</choose>
</select>
- <select id="selectExists" resultType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
- select
- <include refid="Base_Column_List"/>
+ <select id="selectBriefList" parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest"
+ resultType="org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo">
+ select id, consumer_group, mq_type, topic, inlong_group_id,
+ in_charges, status, creator, modifier, create_time, modify_time
from inlong_consume
- where is_deleted = 0
- and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
- and topic = #{topic, jdbcType=VARCHAR}
- and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
- limit 1
+ <where>
+ is_deleted = 0
+ <if test="isAdminRole == false">
+ and (
+ creator = #{currentUser, jdbcType=VARCHAR} or FIND_IN_SET(#{currentUser, jdbcType=VARCHAR}, in_charges)
+ )
+ </if>
+ <if test="keyword != null and keyword !=''">
+ and (consumer_group like CONCAT('%', #{keyword}, '%') or topic like CONCAT('%', #{keyword}, '%'))
+ </if>
+ <if test="consumerGroup != null and consumerGroup != ''">
+ and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+ </if>
+ <if test="mqType != null and mqType != ''">
+ and mq_type = #{mqType, jdbcType=VARCHAR}
+ </if>
+ <if test="topic != null and topic != ''">
+ and topic = #{topic}
+ </if>
+ <if test="inlongGroupId != null and inlongGroupId != ''">
+ and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ <if test="status != null">
+ and status = #{status, jdbcType=INTEGER}
+ </if>
+ <if test="statusList != null and statusList.size() > 0">
+ and status in
+ <foreach collection="statusList" item="status" index="index" open="(" close=")" separator=",">
+ #{status}
+ </foreach>
+ </if>
+ </where>
+ <choose>
+ <when test="orderField != null and orderField != '' and orderType != null and orderType != ''">
+ order by ${orderField} ${orderType}
+ </when>
+ <otherwise>
+ order by create_time desc
+ </otherwise>
+ </choose>
</select>
<update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionInfo.java
deleted file mode 100644
index 7e2abccfc..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.lang3.StringUtils;
-
-import javax.validation.constraints.AssertTrue;
-import javax.validation.constraints.NotBlank;
-import java.util.Date;
-
-/**
- * Data consumption info
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption info")
-public class ConsumptionInfo {
-
- @ApiModelProperty(value = "key id")
- private Integer id;
-
- @ApiModelProperty(value = "consumer group: only support [a-zA-Z0-9_]")
- @NotBlank(message = "consumerGroup cannot be null")
- private String consumerGroup;
-
- @ApiModelProperty(value = "consumption in charge")
- @NotBlank(message = "inCharges cannot be null")
- private String inCharges;
-
- @ApiModelProperty(value = "consumption target inlong group id")
- @NotBlank(message = "inlong group id cannot be null")
- private String inlongGroupId;
-
- @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
- private String mqType;
-
- @ApiModelProperty(value = "consumption target topic")
- private String topic;
-
- @ApiModelProperty(value = "middleware cluster url")
- private String masterUrl;
-
- @ApiModelProperty(value = "whether to filter consumption, 0: not filter, 1: filter")
- @Builder.Default
- private Integer filterEnabled = 0;
-
- @ApiModelProperty(value = "consumption target inlong stream id")
- private String inlongStreamId;
-
- @ApiModelProperty(value = "status, 10: pending assigned, 11: pending approval, "
- + "20: approval rejected, 20: approved")
- private Integer status;
-
- private String creator;
-
- private String modifier;
-
- private Date createTime;
-
- private Date modifyTime;
-
- @ApiModelProperty(value = "Extended information for MQ")
- private ConsumptionMqExtBase mqExtInfo;
-
- @ApiModelProperty(value = "Version number")
- private Integer version;
-
- @JsonIgnore
- @AssertTrue(message = "when filter enabled, inlong stream id cannot be null")
- public boolean isValidateFilter() {
- if (filterEnabled == 0) {
- return true;
- }
- return StringUtils.isNotBlank(inlongStreamId);
- }
-
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionListVo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionListVo.java
deleted file mode 100644
index 0f7affd43..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionListVo.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.Date;
-
-/**
- * Data consumption list
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption list")
-public class ConsumptionListVo {
-
- @ApiModelProperty(value = "Primary key")
- private Integer id;
-
- @ApiModelProperty(value = "Consumer Group")
- private String consumerGroup;
-
- @ApiModelProperty(value = "Person in charge of consumption")
- private String inCharges;
-
- @ApiModelProperty(value = "Consumption target inlong group id")
- private String inlongGroupId;
-
- @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
- private String mqType;
-
- @ApiModelProperty(value = "Consumption target TOPIC")
- private String topic;
-
- @ApiModelProperty(value = "Status: Pending distribution: 10,"
- + " Pending approval: 11, Approval rejected: 20, Approval passed: 21")
- private Integer status;
-
- @ApiModelProperty(value = "Recent consumption time")
- private Date lastConsumptionTime;
-
- @ApiModelProperty(value = "Consumption status: normal: 0, abnormal: 1, shielded: 2, no: 3,")
- private Integer lastConsumptionStatus;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionMqExtBase.java
deleted file mode 100644
index b177fda17..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionMqExtBase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Extended consumption information of different MQs
- */
-@Data
-@ApiModel("Extended consumption information of different MQs")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType",
- defaultImpl = ConsumptionMqExtBase.class)
-@JsonSubTypes({
- @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "PULSAR"),
- @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "TDMQ_PULSAR")
-})
-public class ConsumptionMqExtBase {
-
- @ApiModelProperty(value = "Primary key")
- private Integer id;
-
- @ApiModelProperty(value = "Consumption ID")
- private Integer consumptionId;
-
- @ApiModelProperty(value = "Consumer group")
- private String consumerGroup;
-
- @ApiModelProperty(value = "Consumption target inlong group id")
- private String inlongGroupId;
-
- @ApiModelProperty("Whether to delete, 0: not deleted, 1: deleted")
- private Integer isDeleted = 0;
-
- @ApiModelProperty("The type of MQ")
- private String mqType;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionPulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionPulsarInfo.java
deleted file mode 100644
index c9cbce4dd..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionPulsarInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-import org.apache.inlong.manager.common.consts.MQType;
-
-/**
- * Pulsar consumer information
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-@ToString(callSuper = true)
-@ApiModel("Pulsar consumer information")
-public class ConsumptionPulsarInfo extends ConsumptionMqExtBase {
-
- public ConsumptionPulsarInfo() {
- this.setMqType(MQType.PULSAR);
- }
-
- @ApiModelProperty("Whether to configure the dead letter queue, 0: do not configure, 1: configure")
- private Integer isDlq;
-
- @ApiModelProperty("The name of the dead letter queue Topic")
- private String deadLetterTopic;
-
- @ApiModelProperty("Whether to configure the retry letter queue, 0: do not configure, 1: configure")
- private Integer isRlq;
-
- @ApiModelProperty("The name of the retry letter queue topic")
- private String retryLetterTopic;
-
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionQuery.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionQuery.java
deleted file mode 100644
index c8c22e9d9..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionQuery.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.pojo.common.PageRequest;
-
-/**
- * Data consumption query
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-@ApiModel("Data consumption query conditions")
-public class ConsumptionQuery extends PageRequest {
-
- @ApiModelProperty(value = "Consumer Group")
- private String consumerGroup;
-
- @ApiModelProperty(value = "Person in charge of consumption")
- private String inCharges;
-
- @ApiModelProperty(value = "Consumption target inlong group id")
- private String inlongGroupId;
-
- @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
- private String mqType;
-
- @ApiModelProperty(value = "Consumption target Topic")
- private String topic;
-
- @ApiModelProperty(value = "Whether to filter consumption")
- private Boolean filterEnabled;
-
- @ApiModelProperty(value = "Consumption target stream id")
- private String inlongStreamId;
-
- @ApiModelProperty(value = "Status: Draft: 0, Pending distribution: 10, "
- + "Pending approval: 11, Approval rejected: 20, Approved: 21")
- private Integer status;
-
- @ApiModelProperty(value = "Consumption status: normal: 0, abnormal: 1, shielded: 2, no: 3")
- private Integer lastConsumptionStatus;
-
- private String creator;
-
- private String modifier;
-
- @ApiModelProperty(value = "Current login user")
- private String username;
-
- @ApiModelProperty(value = "Weather current user have admin role", hidden = true)
- private Boolean isAdminRole;
-
- @ApiModelProperty(value = "Fuzzy query keyword, topic, or consumer group")
- private String keyword;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionSummary.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionSummary.java
deleted file mode 100644
index 5264f216b..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionSummary.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * Data consumption statistics
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption statistics")
-public class ConsumptionSummary {
-
- @ApiModelProperty(value = "Total consumption")
- private Integer totalCount;
-
- @ApiModelProperty(value = "Amount to be allocated")
- private Integer waitingAssignCount;
-
- @ApiModelProperty(value = "Amount to be approved")
- private Integer waitingApproveCount;
-
- @ApiModelProperty(value = "Quantity rejected")
- private Integer rejectedCount;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
index 85d657c45..9e535c989 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import java.util.Map;
/**
- * New data consumption form
+ * Apply inlong consume process form
*/
@Data
@EqualsAndHashCode(callSuper = true)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index 5d13d65ee..a21c36670 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -21,23 +21,21 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.MQType;
-import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -56,7 +54,7 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
private static final String PREFIX_RLQ = "rlq";
@Autowired
- private InlongGroupEntityMapper groupMapper;
+ private InlongGroupService groupService;
@Autowired
private InlongStreamEntityMapper streamMapper;
@Autowired
@@ -81,20 +79,18 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
// one inlong stream only has one Pulsar topic,
// one inlong group may have multiple Pulsar topics.
String groupId = request.getInlongGroupId();
+ InlongGroupTopicInfo topicInfo = groupService.getTopic(groupId);
+ Preconditions.checkNotNull(topicInfo, "inlong group not exist for groupId=" + groupId);
+
+ // check the origin topic from request exists
+ InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo) topicInfo;
String originTopic = request.getTopic();
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, originTopic);
- Preconditions.checkNotNull(streamEntity, "not found any Pulsar topic for inlong group " + groupId);
+ Preconditions.checkTrue(pulsarTopic.getTopics().contains(originTopic),
+ "Pulsar topic not exist for " + originTopic);
// format the topic to 'tenant/namespace/topic'
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
- groupEntity.getInlongClusterTag(), null, ClusterType.PULSAR);
- String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
- ? InlongConstants.DEFAULT_PULSAR_TENANT
- : pulsarCluster.getTenant();
-
- request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
- groupEntity.getMqResource(), originTopic));
+ request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+ pulsarTopic.getTenant(), pulsarTopic.getNamespace(), originTopic));
}
@Override
@@ -122,8 +118,8 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
}
- // when saving, the DLQ / RLQ under the same groupId cannot be repeated.
- // when updating, delete the related DLQ / RLQ info.
+ // TODO when saving, save the enabled DLQ / RLQ into inlong_stream, then create Pulsar topic for them
+ // when updating, delete the related DLQ / RLQ info if they were disabled.
String groupId = targetEntity.getInlongGroupId();
if (dlqEnable) {
String dlqTopic = PREFIX_DLQ + "_" + pulsarRequest.getDeadLetterTopic();
@@ -132,6 +128,7 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
} else {
pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
pulsarRequest.setDeadLetterTopic(null);
+ // streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
}
if (rlqEnable) {
String rlqTopic = PREFIX_RLQ + "_" + pulsarRequest.getRetryLetterTopic();
@@ -140,6 +137,7 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
} else {
pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
pulsarRequest.setRetryLetterTopic(null);
+ // streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
}
try {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
index 1a4a57482..e4a252d02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
@@ -34,13 +34,14 @@ public class InlongConsumeOperatorFactory {
private List<InlongConsumeOperator> consumeOperatorList;
/**
- * Get a consumption operator instance via the given mqType
+ * Get an inlong consume operator instance via the given mqType
*/
public InlongConsumeOperator getInstance(String mqType) {
return consumeOperatorList.stream()
.filter(inst -> inst.accept(mqType))
.findFirst()
- .orElseThrow(() -> new
- BusinessException(String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType)));
+ .orElseThrow(() -> new BusinessException(
+ String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType))
+ );
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index 51b2868f5..f9ca2eb6c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
@@ -59,7 +58,7 @@ import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
public class InlongConsumeServiceImpl implements InlongConsumeService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
- private static final String AUTO_CREATE_MSG = "auto create by inlong";
+ private static final String AUTO_CREATE_MSG = "auto_create_by_system";
@Autowired
private InlongConsumeEntityMapper consumeMapper;
@@ -175,11 +174,9 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
- Page<InlongConsumeEntity> entityPage = (Page<InlongConsumeEntity>) consumeMapper.selectByCondition(request);
- List<InlongConsumeBriefInfo> briefInfos = CommonBeanUtils.copyListProperties(entityPage,
- InlongConsumeBriefInfo::new);
+ Page<InlongConsumeBriefInfo> briefInfos = (Page<InlongConsumeBriefInfo>) consumeMapper.selectBriefList(request);
PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(briefInfos,
- entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
+ briefInfos.getTotal(), briefInfos.getPageNum(), briefInfos.getPageSize());
LOGGER.debug("success to list inlong consume for {}", request);
return pageResult;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
deleted file mode 100644
index 5a181fd97..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core;
-
-import org.apache.inlong.manager.pojo.common.PageResult;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-
-/**
- * Data consumption interface
- */
-public interface ConsumptionService {
-
- /**
- * Data consumption statistics
- *
- * @param query Query conditions
- * @return Statistics
- */
- ConsumptionSummary getSummary(ConsumptionQuery query);
-
- /**
- * Get data consumption list according to query conditions
- *
- * @param query Consumption info
- * @return Consumption list
- */
- PageResult<ConsumptionListVo> list(ConsumptionQuery query);
-
- /**
- * Get data consumption details
- *
- * @param id Consumer ID
- * @return Details
- */
- ConsumptionInfo get(Integer id);
-
- /**
- * Determine whether the Consumer group already exists
- *
- * @param consumerGroup Consumer group
- * @param excludeSelfId Exclude the ID of this record
- * @return does it exist
- */
- boolean isConsumerGroupExists(String consumerGroup, Integer excludeSelfId);
-
- /**
- * Save basic data consumption information
- *
- * @param consumptionInfo Data consumption information
- * @param operator Operator
- * @return ID after saved
- */
- Integer save(ConsumptionInfo consumptionInfo, String operator);
-
- /**
- * Update the person in charge of data consumption, etc
- *
- * @param consumptionInfo consumption information
- * @param operator Operator
- */
- Boolean update(ConsumptionInfo consumptionInfo, String operator);
-
- /**
- * Delete data consumption
- *
- * @param id Consumer ID
- * @param operator Operator
- */
- Boolean delete(Integer id, String operator);
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
deleted file mode 100644
index f64eaa192..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.consts.MQType;
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ConsumeStatus;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.pojo.common.PageResult;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionMqExtBase;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
-import org.apache.inlong.manager.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
-import org.apache.inlong.manager.service.user.LoginUserUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.CollectionUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Data consumption service
- */
-@Slf4j
-@Service
-public class ConsumptionServiceImpl implements ConsumptionService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
-
- private static final String PREFIX_DLQ = "dlq"; // prefix of the Topic of the dead letter queue
-
- private static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
-
- @Autowired
- private InlongGroupEntityMapper groupMapper;
- @Autowired
- private ConsumptionEntityMapper consumptionMapper;
- @Autowired
- private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
- @Autowired
- private InlongGroupService groupService;
- @Autowired
- private InlongStreamService streamService;
- @Autowired
- private InlongClusterService clusterService;
-
- @Override
- public ConsumptionSummary getSummary(ConsumptionQuery query) {
- Map<String, Integer> countMap = new HashMap<>();
- consumptionMapper.countByQuery(query)
- .forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));
-
- return ConsumptionSummary.builder()
- .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
- .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.TO_BE_SUBMIT.getCode() + "", 0))
- .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.TO_BE_APPROVAL.getCode() + "", 0))
- .rejectedCount(countMap.getOrDefault(ConsumeStatus.APPROVE_REJECTED.getCode() + "", 0)).build();
- }
-
- @Override
- public PageResult<ConsumptionListVo> list(ConsumptionQuery query) {
- PageHelper.startPage(query.getPageNum(), query.getPageSize());
- query.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.ADMIN));
- Page<ConsumptionEntity> pageResult = (Page<ConsumptionEntity>) consumptionMapper.listByQuery(query);
- List<ConsumptionListVo> consumptionListVos = CommonBeanUtils.copyListProperties(pageResult.getResult(),
- ConsumptionListVo::new);
-
- return new PageResult<>(
- consumptionListVos, pageResult.getTotal(), pageResult.getPageNum(), pageResult.getPageSize()
- );
- }
-
- @Override
- public ConsumptionInfo get(Integer id) {
- Preconditions.checkNotNull(id, "consumption id cannot be null");
- ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(id);
- Preconditions.checkNotNull(entity, "consumption not exist with id:" + id);
-
- ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
- String mqType = info.getMqType();
- if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
- Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
- ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
- info.setMqExtInfo(pulsarInfo);
- }
-
- return info;
- }
-
- @Override
- public boolean isConsumerGroupExists(String consumerGroup, Integer excludeSelfId) {
- ConsumptionQuery consumptionQuery = new ConsumptionQuery();
- consumptionQuery.setConsumerGroup(consumerGroup);
- consumptionQuery.setIsAdminRole(true);
- List<ConsumptionEntity> result = consumptionMapper.listByQuery(consumptionQuery);
- if (excludeSelfId != null) {
- result = result.stream().filter(consumer -> !excludeSelfId.equals(consumer.getId()))
- .collect(Collectors.toList());
- }
- return !CollectionUtils.isEmpty(result);
- }
-
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Integer save(ConsumptionInfo info, String operator) {
- log.debug("begin to save consumption info={}", info);
- Preconditions.checkNotNull(info, "consumption info cannot be null");
- Preconditions.checkNotNull(info.getTopic(), "consumption topic cannot be empty");
- if (isConsumerGroupExists(info.getConsumerGroup(), info.getId())) {
- throw new BusinessException(String.format("consumer group %s already exist", info.getConsumerGroup()));
- }
-
- if (info.getId() != null) {
- ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
- Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
- ConsumeStatus consumeStatus = ConsumeStatus.forCode(consumptionEntity.getStatus());
- Preconditions.checkTrue(ConsumeStatus.allowedUpdate(consumeStatus),
- "consumption not allow update when status is " + consumeStatus.name());
- }
-
- setTopicInfo(info);
- ConsumptionEntity entity = this.saveConsumption(info, operator);
- String mqType = entity.getMqType();
- if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- savePulsarInfo(info.getMqExtInfo(), entity);
- }
-
- log.info("success to save consumption info by user={}", operator);
- return entity.getId();
- }
-
- /**
- * Save Pulsar consumption info
- */
- private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity entity) {
- Preconditions.checkNotNull(mqExtBase, "Pulsar info cannot be empty, as the middleware is Pulsar");
- // If it is transmitted from the web without specifying consumption pulsar info,
- ConsumptionPulsarInfo pulsarInfo;
- if (mqExtBase instanceof ConsumptionPulsarInfo) {
- pulsarInfo = (ConsumptionPulsarInfo) mqExtBase;
- } else {
- pulsarInfo = new ConsumptionPulsarInfo();
- }
-
- // Prerequisite for RLQ to be turned on: DLQ must be turned on
- boolean dlqEnable = (pulsarInfo.getIsDlq() != null && pulsarInfo.getIsDlq() == 1);
- boolean rlqEnable = (pulsarInfo.getIsRlq() != null && pulsarInfo.getIsRlq() == 1);
- if (rlqEnable && !dlqEnable) {
- throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
- }
-
- // When saving, the DLQ / RLQ under the same groupId cannot be repeated;
- // when closing, delete the related configuration
- String groupId = entity.getInlongGroupId();
- if (dlqEnable) {
- String dlqTopic = PREFIX_DLQ + "_" + pulsarInfo.getDeadLetterTopic();
- Boolean exist = streamService.exist(groupId, dlqTopic);
- if (exist) {
- throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
- }
- } else {
- pulsarInfo.setIsDlq(0);
- pulsarInfo.setDeadLetterTopic(null);
- }
- if (rlqEnable) {
- String rlqTopic = PREFIX_RLQ + "_" + pulsarInfo.getRetryLetterTopic();
- Boolean exist = streamService.exist(groupId, rlqTopic);
- if (exist) {
- throw new BusinessException(ErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
- }
- } else {
- pulsarInfo.setIsRlq(0);
- pulsarInfo.setRetryLetterTopic(null);
- }
-
- ConsumptionPulsarEntity pulsar = CommonBeanUtils.copyProperties(pulsarInfo, ConsumptionPulsarEntity::new);
- Integer consumptionId = entity.getId();
- pulsar.setConsumptionId(consumptionId);
- pulsar.setInlongGroupId(groupId);
- pulsar.setConsumerGroup(entity.getConsumerGroup());
- pulsar.setIsDeleted(0);
-
- // Pulsar consumer information may already exist, update if it exists, add if it does not exist
- ConsumptionPulsarEntity exists = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
- if (exists == null) {
- consumptionPulsarMapper.insert(pulsar);
- } else {
- pulsar.setId(exists.getId());
- consumptionPulsarMapper.updateByPrimaryKey(pulsar);
- }
- }
-
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Boolean update(ConsumptionInfo info, String operator) {
- Preconditions.checkNotNull(info, "consumption info cannot be null");
- Integer consumptionId = info.getId();
- Preconditions.checkNotNull(consumptionId, "consumption id cannot be null");
-
- ConsumptionEntity exists = consumptionMapper.selectByPrimaryKey(consumptionId);
- Preconditions.checkNotNull(exists, "consumption not exist with id " + consumptionId);
- Preconditions.checkTrue(exists.getInCharges().contains(operator),
- "operator" + operator + " has no privilege for the consumption");
- String errMsg = String.format("consumption information has already updated, id=%s, curVersion=%s",
- exists.getId(), info.getVersion());
- if (!Objects.equals(exists.getVersion(), info.getVersion())) {
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- ConsumptionEntity entity = new ConsumptionEntity();
- Date now = new Date();
- CommonBeanUtils.copyProperties(info, entity, true);
- entity.setModifier(operator);
- entity.setModifyTime(now);
- // Modify Pulsar consumption info
- String mqType = info.getMqType();
- if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
- Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
- pulsarEntity.setConsumerGroup(info.getConsumerGroup());
-
- // Whether DLQ / RLQ is turned on or off
- ConsumptionPulsarInfo update = (ConsumptionPulsarInfo) info.getMqExtInfo();
- boolean dlqEnable = (update.getIsDlq() != null && update.getIsDlq() == 1);
- boolean rlqEnable = (update.getIsRlq() != null && update.getIsRlq() == 1);
-
- // DLQ is closed, RLQ cannot exist alone and must be closed
- if (rlqEnable && !dlqEnable) {
- throw new BusinessException(ErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
- }
-
- // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
- // add/remove inlong streams in the inlong group
- if (ConsumeStatus.APPROVE_PASSED.getCode() == exists.getStatus()) {
- String groupId = info.getInlongGroupId();
- String dlqNameOld = pulsarEntity.getDeadLetterTopic();
- String dlqNameNew = update.getDeadLetterTopic();
- if (!dlqEnable) {
- pulsarEntity.setIsDlq(0);
- pulsarEntity.setDeadLetterTopic(null);
- streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
- } else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
- pulsarEntity.setIsDlq(1);
- String topic = PREFIX_DLQ + "_" + dlqNameNew;
- topic = topic.toLowerCase(Locale.ROOT);
- pulsarEntity.setDeadLetterTopic(topic);
- streamService.insertDlqOrRlq(groupId, topic, operator);
- }
-
- String rlqNameOld = pulsarEntity.getRetryLetterTopic();
- String rlqNameNew = update.getRetryLetterTopic();
- if (!rlqEnable) {
- pulsarEntity.setIsRlq(0);
- pulsarEntity.setRetryLetterTopic(null);
- streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
- } else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
- pulsarEntity.setIsRlq(1);
- String topic = PREFIX_RLQ + "_" + rlqNameNew;
- topic = topic.toLowerCase(Locale.ROOT);
- pulsarEntity.setRetryLetterTopic(topic);
- streamService.insertDlqOrRlq(groupId, topic, operator);
- }
- }
- consumptionPulsarMapper.updateByConsumptionId(pulsarEntity);
- }
-
- int rowCount = consumptionMapper.updateByPrimaryKeySelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- return true;
- }
-
- @Override
- @Transactional(rollbackFor = Throwable.class)
- public Boolean delete(Integer id, String operator) {
- ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(id);
- Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + id);
- consumptionMapper.deleteByPrimaryKey(id);
-
- consumptionPulsarMapper.deleteByConsumptionId(id);
- return true;
- }
-
- private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
- ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
- entity.setStatus(ConsumeStatus.TO_BE_SUBMIT.getCode());
- entity.setCreator(operator);
- entity.setModifier(operator);
-
- if (info.getId() != null) {
- int rowCount = consumptionMapper.updateByPrimaryKey(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("consumption information has already updated, id={}, curVersion={}",
- entity.getId(), entity.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- } else {
- consumptionMapper.insert(entity);
- }
-
- Preconditions.checkNotNull(entity.getId(), "save consumption failed");
- return entity;
- }
-
- /**
- * Set the topic for the consumption information
- */
- private void setTopicInfo(ConsumptionInfo info) {
- // Determine whether the consumed topic belongs to this groupId or the inlong stream under it
- String groupId = info.getInlongGroupId();
- InlongGroupTopicInfo topicVO = groupService.getTopic(groupId);
- Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);
-
- // Tube’s topic is the inlong group level, one inlong group, one TubeMQ topic
- String mqType = topicVO.getMqType();
- // this class was deprecated, just comment it out
- if (MQType.TUBEMQ.equals(mqType)) {
- String mqResource = /*topicVO.getMqResource()*/ null;
- Preconditions.checkTrue(mqResource == null || mqResource.equals(info.getTopic()),
- "topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
- } else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- // Pulsar's topic is the inlong stream level.
- // There will be multiple inlong streams under one inlong group, and there will be multiple topics
- List<InlongStreamBriefInfo> streamTopics = /*topicVO.getStreamTopics()*/ new ArrayList<>();
- if (streamTopics != null && streamTopics.size() > 0) {
- Set<String> topicSet = new HashSet<>(Arrays.asList(info.getTopic().split(",")));
- streamTopics.forEach(stream -> topicSet.remove(stream.getMqResource()));
- Preconditions.checkEmpty(topicSet, "topic [" + topicSet + "] not belong to inlong group " + groupId);
- }
- InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
- if (null != inlongGroupEntity) {
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
- inlongGroupEntity.getInlongClusterTag(), null, ClusterType.PULSAR);
- String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
- ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
- info.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
- inlongGroupEntity.getMqResource(), info.getTopic()));
- }
-
- }
- info.setMqType(mqType);
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index 908e61220..e34f024d6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -18,7 +18,9 @@
package org.apache.inlong.manager.service.group;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -26,6 +28,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
@@ -110,7 +113,13 @@ public class InlongGroupOperator4Pulsar extends AbstractGroupOperator {
@Override
public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
+ groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
+ String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
+ ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
+
InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
+ topicInfo.setTenant(tenant);
topicInfo.setNamespace(groupInfo.getMqResource());
// each inlong stream is associated with a Pulsar topic
List<String> topics = streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index db2f08ba5..9f3819e49 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core.heartbeat;
+package org.apache.inlong.manager.service.heartbeat;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -54,7 +53,6 @@ import java.util.concurrent.TimeUnit;
public class HeartbeatManager implements AbstractHeartbeatManager {
private static final String AUTO_REGISTERED = "auto registered";
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Getter
private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
similarity index 99%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index 0d789a30d..af644a057 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.heartbeat;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
@@ -44,7 +44,6 @@ import org.apache.inlong.manager.pojo.heartbeat.HeartbeatQueryRequest;
import org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
import org.apache.inlong.manager.pojo.heartbeat.StreamHeartbeatResponse;
import org.apache.inlong.manager.service.core.HeartbeatService;
-import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 07d9ed0b3..47ec39653 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -102,10 +102,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
}
/**
- * Update consumption after approve
+ * Update consume info after approval
*/
- private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
- InlongConsumeEntity existEntity = consumeMapper.selectById(consumptionId);
+ private void updateConsumerInfo(Integer consumeId, String consumerGroup) {
+ InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
existEntity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
existEntity.setConsumerGroup(consumerGroup);
int rowCount = consumeMapper.updateByIdSelective(existEntity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index f66b81955..5b698b856 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -252,7 +252,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
- // insert the consumer group info into the consumption table
+ // insert the consumer group info into the inlong_consume table
Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
id, subs, groupId, topicName);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index fffd83506..6f011875c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -36,7 +36,7 @@ import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
+import org.apache.inlong.manager.service.heartbeat.HeartbeatManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
index 0cacaabb4..dad85fc21 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.heartbeat.HeartbeatManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
deleted file mode 100644
index d7d824b6d..000000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.common.consts.MQType;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
-import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.group.InlongGroupServiceTest;
-import org.junit.jupiter.api.Assertions;
-import org.springframework.beans.factory.annotation.Autowired;
-
-/**
- * Consumption service test
- */
-public class ConsumptionServiceTest extends ServiceBaseTest {
-
- String inlongGroupId = "group_for_consumption_test";
- String consumerGroup = "test_consumer_group";
- String operator = "admin";
-
- @Autowired
- private ConsumptionService consumptionService;
- @Autowired
- private InlongGroupServiceTest groupServiceTest;
-
- private Integer saveConsumption(String inlongGroupId, String consumerGroup, String operator) {
- ConsumptionInfo consumptionInfo = new ConsumptionInfo();
- consumptionInfo.setTopic(inlongGroupId);
- consumptionInfo.setConsumerGroup(consumerGroup);
- consumptionInfo.setInlongGroupId(inlongGroupId);
- consumptionInfo.setMqType(MQType.PULSAR);
- consumptionInfo.setCreator(operator);
- consumptionInfo.setInCharges("admin");
-
- ConsumptionPulsarInfo pulsarInfo = new ConsumptionPulsarInfo();
- pulsarInfo.setMqType(MQType.PULSAR);
- pulsarInfo.setIsDlq(1);
- pulsarInfo.setDeadLetterTopic("test_dlq");
- pulsarInfo.setIsRlq(0);
-
- consumptionInfo.setMqExtInfo(pulsarInfo);
-
- return consumptionService.save(consumptionInfo, operator);
- }
-
- // Online test will be BusinessException: Inlong group does not exist/no operation authority
- // @Test
- public void testSaveAndDelete() {
- groupServiceTest.saveGroup(inlongGroupId, operator);
- Integer id = this.saveConsumption(inlongGroupId, consumerGroup, operator);
- Assertions.assertNotNull(id);
- boolean result = consumptionService.delete(id, operator);
- Assertions.assertTrue(result);
- }
-}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 5fe7e6076..951d854c9 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -130,7 +130,7 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
`type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
`ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
`port` int(6) NULL COMMENT 'Cluster port',
- `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+ `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
`description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
`status` int(4) DEFAULT '0' COMMENT 'Cluster status',
@@ -197,45 +197,6 @@ CREATE TABLE IF NOT EXISTS `data_node`
UNIQUE KEY `unique_data_node` (`name`, `type`, `is_deleted`)
);
--- ----------------------------
--- Deprecated: Table structure for consumption
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group',
- `in_charges` varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `mq_type` varchar(10) DEFAULT 'TUBE' COMMENT 'Message queue type, high throughput: TUBE, high consistency: PULSAR',
- `topic` varchar(256) NOT NULL COMMENT 'Consumption topic',
- `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
- `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream ID for consumption, if filter_enable is 1, it cannot empty',
- `status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'creator',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
- PRIMARY KEY (`id`)
-);
-
--- ----------------------------
--- Deprecated: Table structure for consumption_pulsar
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption_pulsar`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `consumption_id` int(11) DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
- `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group ID',
- `is_rlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
- `retry_letter_topic` varchar(256) DEFAULT NULL COMMENT 'The name of the retry queue topic',
- `is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
- `dead_letter_topic` varchar(256) DEFAULT NULL COMMENT 'dead letter topic name',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- PRIMARY KEY (`id`)
-);
-- ----------------------------
-- Table structure for stream_source_cmd_config
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 6a044065e..fbd73961f 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -140,7 +140,7 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
`type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
`ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
`port` int(6) NULL COMMENT 'Cluster port',
- `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+ `protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
`description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
`status` int(4) DEFAULT '0' COMMENT 'Cluster status',
@@ -209,48 +209,6 @@ CREATE TABLE IF NOT EXISTS `data_node`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table';
--- ----------------------------
--- Deprecated: Table structure for consumption
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group',
- `in_charges` varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `mq_type` varchar(10) DEFAULT 'TUBE' COMMENT 'Message queue type, high throughput: TUBE, high consistency: PULSAR',
- `topic` varchar(256) NOT NULL COMMENT 'Consumption topic',
- `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
- `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream ID for consumption, if filter_enable is 1, it cannot empty',
- `status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'creator',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
- PRIMARY KEY (`id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
-
--- ----------------------------
--- Deprecated: Table structure for consumption_pulsar
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption_pulsar`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `consumption_id` int(11) DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
- `consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group ID',
- `is_rlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
- `retry_letter_topic` varchar(256) DEFAULT NULL COMMENT 'The name of the retry queue topic',
- `is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
- `dead_letter_topic` varchar(256) DEFAULT NULL COMMENT 'dead letter topic name',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- PRIMARY KEY (`id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
-
-- ----------------------------
-- Table structure for stream_source_cmd_config
-- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
deleted file mode 100644
index f1c77ad51..000000000
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.web.controller;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.pojo.common.PageResult;
-import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.consume.InlongConsumeProcessService;
-import org.apache.inlong.manager.service.operationlog.OperationLog;
-import org.apache.inlong.manager.service.user.LoginUserUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.DeleteMapping;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * Data consumption interface
- */
-@RestController
-@RequestMapping("/api")
-@Api(tags = "Consumption-API")
-public class ConsumptionController {
-
- @Autowired
- private ConsumptionService consumptionService;
- @Autowired
- private InlongConsumeProcessService processOperation;
-
- @GetMapping("/consumption/summary")
- @ApiOperation(value = "Get data consumption summary")
- public Response<ConsumptionSummary> getSummary(ConsumptionQuery query) {
- query.setUsername(LoginUserUtils.getLoginUser().getName());
- return Response.success(consumptionService.getSummary(query));
- }
-
- @GetMapping("/consumption/list")
- @ApiOperation(value = "List data consumptions")
- public Response<PageResult<ConsumptionListVo>> list(ConsumptionQuery query) {
- query.setUsername(LoginUserUtils.getLoginUser().getName());
- return Response.success(consumptionService.list(query));
- }
-
- @GetMapping("/consumption/get/{id}")
- @ApiOperation(value = "Get consumption details")
- @ApiImplicitParam(name = "id", value = "Consumption ID", dataTypeClass = Integer.class, required = true)
- public Response<ConsumptionInfo> getDetail(@PathVariable(name = "id") Integer id) {
- return Response.success(consumptionService.get(id));
- }
-
- @DeleteMapping("/consumption/delete/{id}")
- @OperationLog(operation = OperationType.DELETE)
- @ApiOperation(value = "Delete data consumption")
- @ApiImplicitParam(name = "id", value = "Consumption ID", dataTypeClass = Integer.class, required = true)
- public Response<Object> delete(@PathVariable(name = "id") Integer id) {
- this.consumptionService.delete(id, LoginUserUtils.getLoginUser().getName());
- return Response.success();
- }
-
- @PostMapping("/consumption/save")
- @OperationLog(operation = OperationType.UPDATE)
- @ApiOperation(value = "Save data consumption", notes = "Full coverage")
- public Response<Integer> save(@Validated @RequestBody ConsumptionInfo consumptionInfo) {
- String currentUser = LoginUserUtils.getLoginUser().getName();
- return Response.success(consumptionService.save(consumptionInfo, currentUser));
- }
-
- @PostMapping("/consumption/update/{id}")
- @OperationLog(operation = OperationType.UPDATE)
- @ApiOperation(value = "Update data consumption")
- public Response<String> update(@PathVariable(name = "id") Integer id,
- @Validated @RequestBody ConsumptionInfo consumptionInfo) {
- consumptionInfo.setId(id);
- consumptionService.update(consumptionInfo, LoginUserUtils.getLoginUser().getName());
- return Response.success();
- }
-
- @PostMapping("/consumption/startProcess/{id}")
- @OperationLog(operation = OperationType.UPDATE)
- @ApiOperation(value = "Start approval process")
- @ApiImplicitParam(name = "id", value = "Consumption ID", dataTypeClass = Integer.class, required = true)
- public Response<WorkflowResult> startProcess(@PathVariable(name = "id") Integer id) {
- String username = LoginUserUtils.getLoginUser().getName();
- return Response.success(processOperation.startProcess(id, username));
- }
-
-}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowConfig.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowConfig.java
deleted file mode 100644
index f86d9f0d1..000000000
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowConfig.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
-import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
-import org.springframework.transaction.PlatformTransactionManager;
-
-/**
- * Workflow config
- */
-public class WorkflowConfig {
-
- @JsonIgnore
- private WorkflowQueryService queryService;
- @JsonIgnore
- private WorkflowProcessEntityMapper processEntityMapper;
- @JsonIgnore
- private WorkflowTaskEntityMapper taskEntityMapper;
- @JsonIgnore
- private WorkflowEventLogEntityMapper eventLogMapper;
- @JsonIgnore
- private ProcessDefinitionRepository definitionRepository;
- @JsonIgnore
- private PlatformTransactionManager transactionManager;
-
- public WorkflowQueryService getQueryService() {
- return queryService;
- }
-
- public WorkflowConfig setQueryService(WorkflowQueryService queryService) {
- this.queryService = queryService;
- return this;
- }
-
- public WorkflowProcessEntityMapper getProcessEntityMapper() {
- return processEntityMapper;
- }
-
- public WorkflowConfig setProcessEntityMapper(WorkflowProcessEntityMapper processEntityMapper) {
- this.processEntityMapper = processEntityMapper;
- return this;
- }
-
- public WorkflowTaskEntityMapper getTaskEntityMapper() {
- return taskEntityMapper;
- }
-
- public WorkflowConfig setTaskEntityMapper(WorkflowTaskEntityMapper taskEntityMapper) {
- this.taskEntityMapper = taskEntityMapper;
- return this;
- }
-
- public WorkflowEventLogEntityMapper getEventLogMapper() {
- return eventLogMapper;
- }
-
- public WorkflowConfig setEventLogMapper(WorkflowEventLogEntityMapper eventLogMapper) {
- this.eventLogMapper = eventLogMapper;
- return this;
- }
-
- public ProcessDefinitionRepository getDefinitionRepository() {
- return definitionRepository;
- }
-
- public WorkflowConfig setDefinitionRepository(ProcessDefinitionRepository definitionRepository) {
- this.definitionRepository = definitionRepository;
- return this;
- }
-
- public PlatformTransactionManager getTransactionManager() {
- return transactionManager;
- }
-
- public WorkflowConfig setTransactionManager(PlatformTransactionManager transactionManager) {
- this.transactionManager = transactionManager;
- return this;
- }
-
-}