You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/20 04:05:38 UTC
[inlong] branch master updated: [INLONG-6151][Manager] Add data cleansing task and optimize query indexes (#6168)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eb24f3197 [INLONG-6151][Manager] Add data cleansing task and optimize query indexes (#6168)
eb24f3197 is described below
commit eb24f31970560bc2d55663fd37a49a99fa46f1f5
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Oct 20 12:05:32 2022 +0800
[INLONG-6151][Manager] Add data cleansing task and optimize query indexes (#6168)
* Add optional data cleansing to control storage size
* Add query indexes
* Change the config naming and optimize the select performance
* Add default environment variables and boost script processing
Co-authored-by: healchow <he...@gmail.com>
---
inlong-audit/sql/apache_inlong_audit.sql | 2 +-
.../dao/mapper/InlongGroupEntityMapper.java | 22 ++-
.../dao/mapper/InlongGroupExtEntityMapper.java | 9 +-
.../dao/mapper/InlongStreamEntityMapper.java | 8 +-
.../dao/mapper/InlongStreamExtEntityMapper.java | 8 +-
.../dao/mapper/InlongStreamFieldEntityMapper.java | 9 +-
.../manager/dao/mapper/StreamSinkEntityMapper.java | 11 +-
.../dao/mapper/StreamSinkFieldEntityMapper.java | 22 +--
.../dao/mapper/StreamSourceEntityMapper.java | 11 +-
.../dao/mapper/StreamSourceFieldEntityMapper.java | 9 +-
.../dao/mapper/StreamTransformEntityMapper.java | 9 +-
.../mapper/StreamTransformFieldEntityMapper.java | 29 ++--
.../dao/mapper/WorkflowEventLogEntityMapper.java | 8 ++
.../dao/mapper/WorkflowProcessEntityMapper.java | 13 ++
.../dao/mapper/WorkflowTaskEntityMapper.java | 7 +
.../resources/mappers/InlongGroupEntityMapper.xml | 18 ++-
.../mappers/InlongGroupExtEntityMapper.xml | 11 +-
.../resources/mappers/InlongStreamEntityMapper.xml | 12 +-
.../mappers/InlongStreamExtEntityMapper.xml | 16 +--
.../mappers/InlongStreamFieldEntityMapper.xml | 9 +-
.../resources/mappers/StreamSinkEntityMapper.xml | 12 +-
.../mappers/StreamSinkFieldEntityMapper.xml | 8 ++
.../resources/mappers/StreamSourceEntityMapper.xml | 8 ++
.../mappers/StreamSourceFieldEntityMapper.xml | 8 ++
.../mappers/StreamTransformEntityMapper.xml | 10 +-
.../mappers/StreamTransformFieldEntityMapper.xml | 19 ++-
.../mappers/WorkflowEventLogEntityMapper.xml | 9 ++
.../mappers/WorkflowProcessEntityMapper.xml | 17 +++
.../resources/mappers/WorkflowTaskEntityMapper.xml | 8 ++
inlong-manager/manager-docker/Dockerfile | 1 +
inlong-manager/manager-docker/manager-docker.sh | 5 +
.../repository/DataProxyConfigRepository.java | 2 +-
.../service/sink/StreamSinkServiceImpl.java | 2 +-
.../manager/service/task/DataCleansingTask.java | 153 +++++++++++++++++++++
.../repository/DataProxyConfigRepositoryTest.java | 2 +-
.../main/resources/h2/apache_inlong_manager.sql | 50 ++++---
.../manager-web/sql/apache_inlong_manager.sql | 47 ++++---
.../src/main/resources/application-dev.properties | 10 ++
.../src/main/resources/application-prod.properties | 10 ++
.../src/main/resources/application-test.properties | 10 ++
.../src/main/resources/application.properties | 5 +-
41 files changed, 526 insertions(+), 113 deletions(-)
diff --git a/inlong-audit/sql/apache_inlong_audit.sql b/inlong-audit/sql/apache_inlong_audit.sql
index 5ea62c4b4..88784c49b 100644
--- a/inlong-audit/sql/apache_inlong_audit.sql
+++ b/inlong-audit/sql/apache_inlong_audit.sql
@@ -46,6 +46,6 @@ CREATE TABLE IF NOT EXISTS `audit_data`
`size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size',
`delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay count',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
- INDEX ip_packet (`ip`, `inlong_group_id`, `inlong_stream_id`, `log_ts`)
+ INDEX group_stream_audit_id (`inlong_group_id`, `inlong_stream_id`, `audit_id`, `log_ts`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data table';
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
index b54fba22f..b9ba70058 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.springframework.stereotype.Repository;
+import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -53,6 +54,18 @@ public interface InlongGroupEntityMapper {
*/
List<SortSourceGroupInfo> selectAllGroups();
+ /**
+ * Select all groups which are logical deleted before the specified last modify time
+ * <p/>
+ * Note, ensure that all the group ids found have been deleted,
+ * and the group ids not deleted (is_deleted=0) should not be returned.
+ *
+ * @param timeBefore the latest modify time before which to select
+ * @param limit max item count
+ * @return all matched group ids
+ */
+ List<String> selectDeletedGroupIds(@Param("timeBefore") Date timeBefore, @Param("limit") Integer limit);
+
int updateByPrimaryKey(InlongGroupEntity record);
int updateByIdentifierSelective(InlongGroupEntity record);
@@ -62,4 +75,11 @@ public interface InlongGroupEntityMapper {
int deleteByPrimaryKey(Integer id);
-}
\ No newline at end of file
+ /**
+ * Physically delete all inlong groups based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
index 48ef53044..7edd00626 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
@@ -66,4 +66,11 @@ public interface InlongGroupExtEntityMapper {
*/
int logicDeleteAllByGroupId(String groupId);
-}
\ No newline at end of file
+ /**
+ * Physically delete all extended fields based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
index 006f26c4e..66f101a27 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
@@ -62,13 +62,13 @@ public interface InlongStreamEntityMapper {
*/
void logicDeleteDlqOrRlq(String groupId, String streamId, String operator);
- int deleteByPrimaryKey(Integer id);
+ int deleteById(Integer id);
/**
- * Physically delete all inlong streams of the specified inlong group id
+ * Physically delete all inlong streams based on inlong group ids
*
* @return rows deleted
*/
- int deleteAllByGroupId(@Param("groupId") String groupId);
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
index 94b65419e..f59e9df94 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
@@ -53,13 +53,13 @@ public interface InlongStreamExtEntityMapper {
*/
int logicDeleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
- int deleteByPrimaryKey(Integer id);
+ int deleteById(Integer id);
/**
- * Physically delete all extension fields based on group id and stream id
+ * Physically delete all extension fields based on inlong group ids
*
* @return rows deleted
*/
- int deleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
index 9a5465a06..e66b4987d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
@@ -47,4 +47,11 @@ public interface InlongStreamFieldEntityMapper {
*/
int deleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
-}
\ No newline at end of file
+ /**
+ * Physically delete all stream fields based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 2662e9c0a..7142976dc 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -142,6 +142,13 @@ public interface StreamSinkEntityMapper {
int updateStatus(StreamSinkEntity entity);
- int deleteByPrimaryKey(Integer id);
+ int deleteById(Integer id);
-}
\ No newline at end of file
+ /**
+ * Physically delete all stream sinks based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
index a75027b4d..b487deda7 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
@@ -38,24 +38,30 @@ public interface StreamSinkFieldEntityMapper {
* According to the sink id, query the sink field.
*
* @param sinkId sink id.
- * @return Sink field list.
+ * @return sink field list.
*/
List<StreamSinkFieldEntity> selectBySinkId(@Param("sinkId") Integer sinkId);
/**
- * According to the sink id, logically delete the corresponding field information.
+ * Logically delete all stream sink fields based on sink id
*
- * @param sinkId sink id.
- * @return rows deleted.
+ * @param sinkId sink id
+ * @return rows deleted
*/
int logicDeleteAll(@Param("sinkId") Integer sinkId);
/**
- * According to the sink id, physically delete the corresponding field information
+ * Physically delete all stream sink fields based on sink id
*
- * @param sinkId sink id.
- * @return rows deleted.
+ * @return rows deleted
*/
int deleteAll(@Param("sinkId") Integer sinkId);
-}
\ No newline at end of file
+ /**
+ * Physically delete all stream sink fields based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 8cc63e4e3..6e851e11c 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -130,8 +130,15 @@ public interface StreamSourceEntityMapper {
int updateSnapshot(StreamSourceEntity entity);
/**
- * Physical delete stream sources.
+ * Physical delete stream sources by group id and stream id
*/
int deleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
-}
\ No newline at end of file
+ /**
+ * Physically delete all stream sources based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index de5960ef3..0059e8362 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -59,4 +59,11 @@ public interface StreamSourceFieldEntityMapper {
*/
int deleteAll(@Param("sourceId") Integer sourceId);
-}
\ No newline at end of file
+ /**
+ * Physically delete all stream source fields based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
index c4bc4ca1a..a087624a0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
@@ -41,4 +41,11 @@ public interface StreamTransformEntityMapper {
int deleteById(Integer id);
-}
\ No newline at end of file
+ /**
+ * Physically delete all stream transforms based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java
index a38885440..8e542be17 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java
@@ -26,25 +26,17 @@ import java.util.List;
@Repository
public interface StreamTransformFieldEntityMapper {
- int deleteByPrimaryKey(Integer id);
-
int insert(StreamTransformFieldEntity record);
int insertSelective(StreamTransformFieldEntity record);
/**
- * Selete undeleted transform field by transform id.
- *
- * @param transformId
- * @return
+ * Select undeleted transform field by transform id.
*/
List<StreamTransformFieldEntity> selectByTransformId(@Param("transformId") Integer transformId);
/**
- * Selete undeleted transform field by transform ids.
- *
- * @param transformIds
- * @return
+ * Select undeleted transform field by transform ids.
*/
List<StreamTransformFieldEntity> selectByTransformIds(@Param("transformIds") List<Integer> transformIds);
@@ -56,16 +48,21 @@ public interface StreamTransformFieldEntityMapper {
/**
* Insert all field list
- *
- * @param fieldList
*/
void insertAll(@Param("list") List<StreamTransformFieldEntity> fieldList);
+ int deleteById(Integer id);
+
/**
* Delete all field list by transformId
- *
- * @param transformId
- * @return
*/
int deleteAll(@Param("transformId") Integer transformId);
-}
\ No newline at end of file
+
+ /**
+ * Physically delete all stream transform fields based on inlong group ids
+ *
+ * @return rows deleted
+ */
+ int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java
index 2927601fb..f17d80d1b 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.ibatis.annotations.Param;
import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
import org.apache.inlong.manager.pojo.workflow.EventLogRequest;
import org.springframework.stereotype.Repository;
@@ -37,4 +38,11 @@ public interface WorkflowEventLogEntityMapper {
int update(WorkflowEventLogEntity record);
+ /**
+ * Physically delete all event logs based on process ids
+ *
+ * @return rows deleted
+ */
+ int deleteByProcessIds(@Param("processIdList") List<Integer> processIdList);
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java
index 2834bf47a..9a8080c9a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.ibatis.annotations.Param;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.pojo.common.CountInfo;
import org.apache.inlong.manager.pojo.workflow.ProcessCountRequest;
@@ -37,8 +38,20 @@ public interface WorkflowProcessEntityMapper {
List<WorkflowProcessEntity> selectByCondition(ProcessRequest query);
+ /**
+ * Select process ids based on inlong group id list
+ */
+ List<Integer> selectByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
List<CountInfo> countByQuery(ProcessCountRequest query);
void update(WorkflowProcessEntity workflowProcessEntity);
+ /**
+ * Physically delete all process infos based on process ids
+ *
+ * @return rows deleted
+ */
+ int deleteByProcessIds(@Param("processIdList") List<Integer> processIdList);
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java
index 90fab1cac..786ecf0e2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java
@@ -48,4 +48,11 @@ public interface WorkflowTaskEntityMapper {
int update(WorkflowTaskEntity workflowTaskEntity);
+ /**
+ * Physically delete all task infos based on process ids
+ *
+ * @return rows deleted
+ */
+ int deleteByProcessIds(@Param("processIdList") List<Integer> processIdList);
+
}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index f2b2b29e0..e3b832339 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -208,6 +208,14 @@
from inlong_group
where is_deleted = 0
</select>
+ <select id="selectDeletedGroupIds" resultType="java.lang.String">
+ select inlong_group_id
+ from inlong_group
+ where modify_time <= #{timeBefore, jdbcType=TIMESTAMP}
+ group by inlong_group_id
+ having min(is_deleted) > 0
+ limit #{limit, jdbcType=INTEGER}
+ </select>
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
update inlong_group
@@ -317,4 +325,12 @@
from inlong_group
where id = #{id,jdbcType=INTEGER}
</delete>
-</mapper>
\ No newline at end of file
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from inlong_group
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
index 4ac134c6f..7093ccb89 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
@@ -150,4 +150,13 @@
where inlong_group_id = #{groupId, jdbcType=VARCHAR}
and is_deleted = 0
</update>
-</mapper>
\ No newline at end of file
+
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from inlong_group_ext
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index f6b3a549a..136c33f12 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -413,15 +413,17 @@
and is_deleted = 0
</update>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ <delete id="deleteById" parameterType="java.lang.Integer">
delete
from inlong_stream
where id = #{id,jdbcType=INTEGER}
</delete>
- <delete id="deleteAllByGroupId">
+ <delete id="deleteByInlongGroupIds">
delete
from inlong_stream
- where inlong_group_id = #{groupId, jdbcType=VARCHAR}
- and is_deleted = 0
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
</delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
index efa7c726e..7e62e6e35 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
@@ -99,19 +99,17 @@
</if>
</update>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ <delete id="deleteById" parameterType="java.lang.Integer">
delete
from inlong_stream_ext
where id = #{id,jdbcType=INTEGER}
</delete>
-
- <delete id="deleteAllByRelatedId">
+ <delete id="deleteByInlongGroupIds">
delete
from inlong_stream_ext
- where inlong_group_id = #{groupId, jdbcType=VARCHAR}
- <if test="streamId != null and streamId != ''">
- and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- </if>
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
</delete>
-
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index 75f389ee0..de08a9aa6 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -125,5 +125,12 @@
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
and is_deleted = 0
</delete>
-
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from inlong_stream_field
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 12e3bc6c0..c4af01696 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -439,9 +439,17 @@
where id = #{id,jdbcType=INTEGER}
</update>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ <delete id="deleteById" parameterType="java.lang.Integer">
delete
from stream_sink
where id = #{id,jdbcType=INTEGER}
</delete>
-</mapper>
\ No newline at end of file
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from stream_sink
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 052b46556..8c85171ff 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -132,4 +132,12 @@
from stream_sink_field
where sink_id = #{sinkId,jdbcType=INTEGER}
</delete>
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from stream_sink_field
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 07bb4e8d4..7b1d21071 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -401,4 +401,12 @@
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
</delete>
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from stream_source
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index 0bf9acd9b..f4ba00366 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -258,4 +258,12 @@
from stream_source_field
where source_id = #{sourceId,jdbcType=INTEGER}
</delete>
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from stream_source_field
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
index 65c299556..302e5f81f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
@@ -202,4 +202,12 @@
from stream_transform
where id = #{id,jdbcType=INTEGER}
</delete>
-</mapper>
\ No newline at end of file
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from stream_transform
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index f35c28675..25147dbf2 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -67,11 +67,7 @@
from stream_transform_field
where id = #{id,jdbcType=INTEGER}
</select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete
- from stream_transform_field
- where id = #{id,jdbcType=INTEGER}
- </delete>
+
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
@@ -299,9 +295,22 @@
</foreach>
</insert>
+ <delete id="deleteById" parameterType="java.lang.Integer">
+ delete
+ from stream_transform_field
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
<delete id="deleteAll">
delete
from stream_transform_field
where transform_id = #{transformId,jdbcType=INTEGER}
</delete>
+ <delete id="deleteByInlongGroupIds">
+ delete
+ from stream_transform_field
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml
index 97dad343b..3147b1f74 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml
@@ -136,4 +136,13 @@
remark = #{remark,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
+
+ <delete id="deleteByProcessIds">
+ delete
+ from workflow_event_log
+ where process_id in
+ <foreach item="item" index="index" collection="processIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
index 3a46eef18..1969918b4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
@@ -111,6 +111,14 @@
</where>
order by id desc
</select>
+ <select id="selectByInlongGroupIds" resultType="java.lang.Integer">
+ select id
+ from workflow_process
+ where inlong_group_id in
+ <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </select>
<select id="countByQuery" parameterType="org.apache.inlong.manager.pojo.workflow.ProcessCountRequest"
resultType="org.apache.inlong.manager.pojo.common.CountInfo">
@@ -155,4 +163,13 @@
ext_params = #{extParams,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
+
+ <delete id="deleteByProcessIds">
+ delete
+ from workflow_process
+ where id in
+ <foreach item="item" index="index" collection="processIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml
index 8746164fd..84294c128 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml
@@ -187,4 +187,12 @@
where id = #{id,jdbcType=INTEGER}
</update>
+ <delete id="deleteByProcessIds">
+ delete
+ from workflow_task
+ where process_id in
+ <foreach item="item" index="index" collection="processIdList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </delete>
</mapper>
diff --git a/inlong-manager/manager-docker/Dockerfile b/inlong-manager/manager-docker/Dockerfile
index c12a6d82f..405b3e533 100644
--- a/inlong-manager/manager-docker/Dockerfile
+++ b/inlong-manager/manager-docker/Dockerfile
@@ -27,6 +27,7 @@ ENV JDBC_URL=127.0.0.1:3306
ENV USERNAME=root
ENV PASSWORD=inlong
ENV ZK_URL=127.0.0.1:2181
+ENV CLEANSING_ENABLE=false
# support download plugins from remote address.
ENV PLUGINS_URL=default
# for flink-sort-plugin.properties
diff --git a/inlong-manager/manager-docker/manager-docker.sh b/inlong-manager/manager-docker/manager-docker.sh
index 7167a97d2..924f07ea9 100644
--- a/inlong-manager/manager-docker/manager-docker.sh
+++ b/inlong-manager/manager-docker/manager-docker.sh
@@ -33,6 +33,11 @@ sed -i "s/spring.profiles.active=.*$/spring.profiles.active=${ACTIVE_PROFILE}/g"
sed -i "s/127.0.0.1:3306/${JDBC_URL}/g" "${conf_file}"
sed -i "s/datasource.druid.username=.*$/datasource.druid.username=${USERNAME}/g" "${conf_file}"
sed -i "s/datasource.druid.password=.*$/datasource.druid.password=${PASSWORD}/g" "${conf_file}"
+# for data cleansing
+sed -i "s/data.cleansing.enabled=.*$/data.cleansing.enabled=${CLEANSING_ENABLE}/g" "${conf_file}"
+sed -i "s/data.cleansing.interval.seconds=.*$/data.cleansing.interval.seconds=${CLEANSING_INTERVAL}/g" "${conf_file}"
+sed -i "s/data.cleansing.before.days=.*$/data.cleansing.before.days=${CLEANSING_BEFORE_DAYS}/g" "${conf_file}"
+sed -i "s/data.cleansing.batchSize=.*$/data.cleansing.batchSize=${CLEANSING_BATCHSIZE}/g" "${conf_file}"
# for audit data
sed -i "s/audit.ck.jdbcUrl=.*$/audit.ck.jdbcUrl=${AUDIT_CK_URL}/g" "${conf_file}"
sed -i "s/audit.ck.username=.*$/audit.ck.username=${AUDIT_CK_USERNAME}/g" "${conf_file}"
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 5c447e4ef..d49eb9a34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -661,7 +661,7 @@ public class DataProxyConfigRepository implements IRepository {
}
// delete old stream sink
deleteStreamSinks.forEach((v) -> {
- streamSinkMapper.deleteByPrimaryKey(v.getId());
+ streamSinkMapper.deleteById(v.getId());
});
return inlongGroupId;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4e6c81278..1ae1e9498 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -384,7 +384,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isNotEmpty(entityList)) {
entityList.forEach(entity -> {
- sinkMapper.deleteByPrimaryKey(entity.getId());
+ sinkMapper.deleteById(entity.getId());
sinkFieldMapper.deleteAll(entity.getId());
});
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
new file mode 100644
index 000000000..bb811c285
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.task;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamTransformFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
+import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
+import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Data cleansing task, it will be executed periodically if turned on.
+ */
+@Slf4j
+@Service
+public class DataCleansingTask extends TimerTask implements InitializingBean {
+
+ /**
+ * The execution starts after this delay in seconds.
+ */
+ private static final int INITIAL_DELAY = 60;
+
+ @Value("${data.cleansing.enabled:false}")
+ private Boolean enabled;
+ @Value("${data.cleansing.interval.seconds:1800}")
+ private Integer interval;
+ @Value("${data.cleansing.before.days:10}")
+ private Integer before;
+ @Value("${data.cleansing.batchSize:100}")
+ private Integer batchSize;
+
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongGroupExtEntityMapper groupExtMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private InlongStreamExtEntityMapper streamExtMapper;
+ @Autowired
+ private InlongStreamFieldEntityMapper streamFieldMapper;
+ @Autowired
+ private StreamSinkEntityMapper streamSinkMapper;
+ @Autowired
+ private StreamSinkFieldEntityMapper streamSinkFieldMapper;
+ @Autowired
+ private StreamSourceEntityMapper streamSourceMapper;
+ @Autowired
+ private StreamSourceFieldEntityMapper streamSourceFieldMapper;
+ @Autowired
+ private StreamTransformEntityMapper streamTransformMapper;
+ @Autowired
+ private StreamTransformFieldEntityMapper streamTransformFieldMapper;
+
+ @Autowired
+ private WorkflowProcessEntityMapper workflowProcessMapper;
+ @Autowired
+ private WorkflowTaskEntityMapper workflowTaskMapper;
+ @Autowired
+ private WorkflowEventLogEntityMapper workflowEventLogMapper;
+
+ @Override
+ public void afterPropertiesSet() {
+ if (enabled) {
+ log.info("start data cleansing timer task");
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleWithFixedDelay(this, INITIAL_DELAY, interval, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ // get the date before `before` days
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ calendar.add(Calendar.DAY_OF_MONTH, -before);
+
+ Date daysBefore = calendar.getTime();
+ List<String> groupIds = groupMapper.selectDeletedGroupIds(daysBefore, batchSize);
+ if (CollectionUtils.isEmpty(groupIds)) {
+ return;
+ }
+
+ log.info("begin to delete data before {}, group ids {}", daysBefore, groupIds);
+
+ // cleanse configuration data
+ groupMapper.deleteByInlongGroupIds(groupIds);
+ groupExtMapper.deleteByInlongGroupIds(groupIds);
+ streamMapper.deleteByInlongGroupIds(groupIds);
+ streamExtMapper.deleteByInlongGroupIds(groupIds);
+ streamFieldMapper.deleteByInlongGroupIds(groupIds);
+
+ streamSinkMapper.deleteByInlongGroupIds(groupIds);
+ streamSinkFieldMapper.deleteByInlongGroupIds(groupIds);
+ streamSourceMapper.deleteByInlongGroupIds(groupIds);
+ streamSourceFieldMapper.deleteByInlongGroupIds(groupIds);
+ streamTransformMapper.deleteByInlongGroupIds(groupIds);
+ streamTransformFieldMapper.deleteByInlongGroupIds(groupIds);
+
+ // cleanse workflow data
+ List<Integer> processIds = workflowProcessMapper.selectByInlongGroupIds(groupIds);
+ workflowProcessMapper.deleteByProcessIds(processIds);
+ workflowTaskMapper.deleteByProcessIds(processIds);
+ workflowEventLogMapper.deleteByProcessIds(processIds);
+
+ log.info("success to delete data before {}, group ids size: {}", daysBefore, groupIds.size());
+ } catch (Exception e) {
+ log.error("exception while cleansing data from db: ", e);
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
index fc726976b..87959e5bb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
@@ -149,7 +149,7 @@ public class DataProxyConfigRepositoryTest {
StreamSinkEntityMapper mapper = PowerMockito.mock(StreamSinkEntityMapper.class);
PowerMockito.when(mapper.selectByCondition(any())).thenReturn(streamSinks);
PowerMockito.when(mapper.insert(any())).thenReturn(1);
- PowerMockito.when(mapper.deleteByPrimaryKey(anyInt())).thenReturn(1);
+ PowerMockito.when(mapper.deleteById(anyInt())).thenReturn(1);
return mapper;
}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 91b731723..5fe7e6076 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -50,7 +50,9 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
- INDEX group_status_deleted_index (`status`, `is_deleted`)
+ INDEX `group_status_deleted_index` (`status`, `is_deleted`),
+ INDEX `group_modify_time_index` (`modify_time`),
+ INDEX `group_cluster_tag_index` (`inlong_cluster_tag`)
);
-- ----------------------------
@@ -65,8 +67,8 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- KEY `group_id_index` (`inlong_group_id`),
- UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
+ UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`),
+ INDEX `group_ext_group_index` (`inlong_group_id`)
);
-- ----------------------------
@@ -114,7 +116,8 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`),
+ INDEX `cluster_type_index` (`type`)
);
-- ----------------------------
@@ -232,8 +235,7 @@ CREATE TABLE IF NOT EXISTS `consumption_pulsar`
`dead_letter_topic` varchar(256) DEFAULT NULL COMMENT 'dead letter topic name',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`)
-) COMMENT ='Pulsar consumption table';
-
+);
-- ----------------------------
-- Table structure for stream_source_cmd_config
-- ----------------------------
@@ -248,7 +250,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`result_info` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`),
- KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
+ UNIQUE KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
);
-- ----------------------------
@@ -282,7 +284,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`),
+ INDEX `stream_group_id_index` (`inlong_group_id`)
);
-- ----------------------------
@@ -299,7 +302,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
- KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
+ INDEX `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
);
-- ----------------------------
@@ -322,7 +325,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `field_stream_id_index` (`inlong_stream_id`)
+ INDEX `stream_field_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
);
-- ----------------------------
@@ -377,9 +380,9 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
- KEY `source_status_index` (`status`, `is_deleted`),
- KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
- KEY `template_id_index` (`template_id`)
+ INDEX `source_status_index` (`status`, `is_deleted`),
+ INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+ INDEX `source_template_id_index` (`template_id`)
);
-- ----------------------------
@@ -443,8 +446,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `source_id` int(11) NOT NULL COMMENT 'Sink id',
- `source_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `source_id` int(11) NOT NULL COMMENT 'Source id',
+ `source_type` varchar(15) NOT NULL COMMENT 'Source type',
`field_name` varchar(120) NOT NULL COMMENT 'field name',
`field_value` varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
`pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
@@ -456,7 +459,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `source_id_index` (`source_id`)
+ INDEX `source_id_index` (`source_id`),
+ INDEX `source_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
);
-- ----------------------------
@@ -483,7 +487,8 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
-- The source node name of the transport field
`origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name before transform operation',
PRIMARY KEY (`id`),
- KEY `transform_id_index` (`transform_id`)
+ INDEX `transform_id_index` (`transform_id`),
+ INDEX `transform_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
);
-- ----------------------------
@@ -509,7 +514,9 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
`origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name before transform operation',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ INDEX `sink_id_index` (`sink_id`),
+ INDEX `sink_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
);
-- ----------------------------
@@ -592,7 +599,7 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- KEY `process_name_task_name_index` (`process_name`, `task_name`)
+ INDEX `process_name_task_name_index` (`process_name`, `task_name`)
);
-- create workflow approver for newly inlong group and inlong consume.
@@ -624,7 +631,8 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
`remark` text COMMENT 'Execution result remark information',
`exception` mediumtext COMMENT 'Exception information',
PRIMARY KEY (`id`),
- INDEX event_group_status_index (`inlong_group_id`, `status`)
+ INDEX event_group_status_index (`inlong_group_id`, `status`),
+ INDEX event_process_task_index (`process_id`, `task_id`)
);
-- ----------------------------
@@ -716,7 +724,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
`topic` varchar(128) DEFAULT '' COMMENT 'Topic',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
PRIMARY KEY (`id`),
- KEY `sort_source_config_index` (`cluster_name`, `task_name`)
+ INDEX `sort_source_config_index` (`cluster_name`, `task_name`)
);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index bd0416aaa..6a044065e 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -56,7 +56,9 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
- INDEX group_status_deleted_index (`status`, `is_deleted`)
+ INDEX `group_status_deleted_index` (`status`, `is_deleted`),
+ INDEX `group_modify_time_index` (`modify_time`),
+ INDEX `group_cluster_tag_index` (`inlong_cluster_tag`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group table';
@@ -72,8 +74,8 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- KEY `group_id_index` (`inlong_group_id`),
- UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
+ UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`),
+ INDEX `group_ext_group_index` (`inlong_group_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group extension table';
@@ -123,7 +125,8 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`),
+ INDEX `cluster_type_index` (`type`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster table';
@@ -262,7 +265,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`result_info` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`),
- KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
+ UNIQUE KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
@@ -297,7 +300,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`),
+ INDEX `stream_group_id_index` (`inlong_group_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream table';
@@ -315,7 +319,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
- KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
+ INDEX `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
@@ -339,7 +343,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `field_stream_id_index` (`inlong_stream_id`)
+ INDEX `stream_field_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='File/DB data source field table';
@@ -396,9 +400,9 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
- KEY `source_status_index` (`status`, `is_deleted`),
- KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
- KEY `template_id_index` (`template_id`)
+ INDEX `source_status_index` (`status`, `is_deleted`),
+ INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+ INDEX `source_template_id_index` (`template_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
@@ -465,8 +469,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `source_id` int(11) NOT NULL COMMENT 'Sink id',
- `source_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `source_id` int(11) NOT NULL COMMENT 'Source id',
+ `source_type` varchar(15) NOT NULL COMMENT 'Source type',
`field_name` varchar(120) NOT NULL COMMENT 'field name',
`field_value` varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
`pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
@@ -478,7 +482,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `source_id_index` (`source_id`)
+ INDEX `source_id_index` (`source_id`),
+ INDEX `source_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
@@ -506,7 +511,8 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
-- The source node name of the transport field
`origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name before transform operation',
PRIMARY KEY (`id`),
- KEY `transform_id_index` (`transform_id`)
+ INDEX `transform_id_index` (`transform_id`),
+ INDEX `transform_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
@@ -533,7 +539,9 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
`origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name before transform operation',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ INDEX `sink_id_index` (`sink_id`),
+ INDEX `sink_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink field table';
@@ -628,7 +636,7 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
`version` int(11) NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
PRIMARY KEY (`id`),
- KEY `process_name_task_name_index` (`process_name`, `task_name`)
+ INDEX `process_name_task_name_index` (`process_name`, `task_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow approver table';
@@ -661,7 +669,8 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
`remark` text COMMENT 'Execution result remark information',
`exception` mediumtext COMMENT 'Exception information',
PRIMARY KEY (`id`),
- INDEX event_group_status_index (`inlong_group_id`, `status`)
+ INDEX event_group_status_index (`inlong_group_id`, `status`),
+ INDEX event_process_task_index (`process_id`, `task_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow event log table';
@@ -757,7 +766,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
`topic` varchar(128) DEFAULT '' COMMENT 'Topic',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
PRIMARY KEY (`id`),
- KEY `sort_source_config_index` (`cluster_name`, `task_name`)
+ INDEX `sort_source_config_index` (`cluster_name`, `task_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index bf0030587..2a3f4e767 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -72,3 +72,13 @@ audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
audit.ck.username=default
# ClickHouse password
audit.ck.password=
+
+# Database cleansing
+# If turned on, logically deleted data will be collected and permanently deleted periodically
+data.cleansing.enabled=false
+# The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour)
+data.cleansing.interval.seconds=1800
+# Select the data whose latest modify time is some days before, default is 10 days
+data.cleansing.before.days=10
+# The maximum size of data to be deleted in batch, default is 100
+data.cleansing.batchSize=100
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 8c5fc1d79..a0f785e40 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -70,3 +70,13 @@ audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
audit.ck.username=default
# ClickHouse password
audit.ck.password=
+
+# Database cleansing
+# If turned on, logically deleted data will be collected and permanently deleted periodically
+data.cleansing.enabled=false
+# The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour)
+data.cleansing.interval.seconds=1800
+# Select the data whose latest modify time is some days before, default is 10 days
+data.cleansing.before.days=10
+# The maximum size of data to be deleted in batch, default is 100
+data.cleansing.batchSize=100
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index d8f927a54..28cd6a856 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -71,3 +71,13 @@ audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
audit.ck.username=default
# ClickHouse password
audit.ck.password=
+
+# Database cleansing
+# If turned on, logically deleted data will be collected and permanently deleted periodically
+data.cleansing.enabled=false
+# The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour)
+data.cleansing.interval.seconds=1800
+# Select the data whose latest modify time is some days before, default is 10 days
+data.cleansing.before.days=10
+# The maximum size of data to be deleted in batch, default is 100
+data.cleansing.batchSize=100
diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties
index 91d6eacf8..796896f23 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -15,7 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
server.host=127.0.0.1
server.port=8083
@@ -58,9 +57,9 @@ inlong.auth.type=default
inlong.encrypt.version=1
inlong.encrypt.key.value1="I!N@L#O$N%G^"
-# clients (e.g. agent and dataproxy) must be authenticated by secretId and secretKey if turned on
+# Clients (e.g. agent and dataproxy) must be authenticated by secretId and secretKey if turned on
openapi.auth.enabled=false
-# audit view by role, see audit id definitions: https://inlong.apache.org/docs/modules/audit/overview#audit-id
+# Audit view by role, see audit id definitions: https://inlong.apache.org/docs/modules/audit/overview#audit-id
audit.admin.ids=3,4,5,6,7,8
audit.user.ids=3,4,5,6,7,8