You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/20 04:05:38 UTC

[inlong] branch master updated: [INLONG-6151][Manager] Add data cleansing task and optimize query indexes (#6168)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eb24f3197 [INLONG-6151][Manager] Add data cleansing task and optimize query indexes (#6168)
eb24f3197 is described below

commit eb24f31970560bc2d55663fd37a49a99fa46f1f5
Author: woofyzhao <49...@qq.com>
AuthorDate: Thu Oct 20 12:05:32 2022 +0800

    [INLONG-6151][Manager] Add data cleansing task and optimize query indexes (#6168)
    
    * Add optional data cleansing to control storage size
    
    * Add query indexes
    
    * Change the config naming and optimize the select performance
    
    * Add default environment variables and boost script processing
    
    Co-authored-by: healchow <he...@gmail.com>
---
 inlong-audit/sql/apache_inlong_audit.sql           |   2 +-
 .../dao/mapper/InlongGroupEntityMapper.java        |  22 ++-
 .../dao/mapper/InlongGroupExtEntityMapper.java     |   9 +-
 .../dao/mapper/InlongStreamEntityMapper.java       |   8 +-
 .../dao/mapper/InlongStreamExtEntityMapper.java    |   8 +-
 .../dao/mapper/InlongStreamFieldEntityMapper.java  |   9 +-
 .../manager/dao/mapper/StreamSinkEntityMapper.java |  11 +-
 .../dao/mapper/StreamSinkFieldEntityMapper.java    |  22 +--
 .../dao/mapper/StreamSourceEntityMapper.java       |  11 +-
 .../dao/mapper/StreamSourceFieldEntityMapper.java  |   9 +-
 .../dao/mapper/StreamTransformEntityMapper.java    |   9 +-
 .../mapper/StreamTransformFieldEntityMapper.java   |  29 ++--
 .../dao/mapper/WorkflowEventLogEntityMapper.java   |   8 ++
 .../dao/mapper/WorkflowProcessEntityMapper.java    |  13 ++
 .../dao/mapper/WorkflowTaskEntityMapper.java       |   7 +
 .../resources/mappers/InlongGroupEntityMapper.xml  |  18 ++-
 .../mappers/InlongGroupExtEntityMapper.xml         |  11 +-
 .../resources/mappers/InlongStreamEntityMapper.xml |  12 +-
 .../mappers/InlongStreamExtEntityMapper.xml        |  16 +--
 .../mappers/InlongStreamFieldEntityMapper.xml      |   9 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   |  12 +-
 .../mappers/StreamSinkFieldEntityMapper.xml        |   8 ++
 .../resources/mappers/StreamSourceEntityMapper.xml |   8 ++
 .../mappers/StreamSourceFieldEntityMapper.xml      |   8 ++
 .../mappers/StreamTransformEntityMapper.xml        |  10 +-
 .../mappers/StreamTransformFieldEntityMapper.xml   |  19 ++-
 .../mappers/WorkflowEventLogEntityMapper.xml       |   9 ++
 .../mappers/WorkflowProcessEntityMapper.xml        |  17 +++
 .../resources/mappers/WorkflowTaskEntityMapper.xml |   8 ++
 inlong-manager/manager-docker/Dockerfile           |   1 +
 inlong-manager/manager-docker/manager-docker.sh    |   5 +
 .../repository/DataProxyConfigRepository.java      |   2 +-
 .../service/sink/StreamSinkServiceImpl.java        |   2 +-
 .../manager/service/task/DataCleansingTask.java    | 153 +++++++++++++++++++++
 .../repository/DataProxyConfigRepositoryTest.java  |   2 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  50 ++++---
 .../manager-web/sql/apache_inlong_manager.sql      |  47 ++++---
 .../src/main/resources/application-dev.properties  |  10 ++
 .../src/main/resources/application-prod.properties |  10 ++
 .../src/main/resources/application-test.properties |  10 ++
 .../src/main/resources/application.properties      |   5 +-
 41 files changed, 526 insertions(+), 113 deletions(-)

diff --git a/inlong-audit/sql/apache_inlong_audit.sql b/inlong-audit/sql/apache_inlong_audit.sql
index 5ea62c4b4..88784c49b 100644
--- a/inlong-audit/sql/apache_inlong_audit.sql
+++ b/inlong-audit/sql/apache_inlong_audit.sql
@@ -46,6 +46,6 @@ CREATE TABLE IF NOT EXISTS `audit_data`
     `size`             BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message size',
     `delay`            BIGINT       NOT NULL DEFAULT '0' COMMENT 'Message delay count',
     `update_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
-    INDEX ip_packet (`ip`, `inlong_group_id`, `inlong_stream_id`, `log_ts`)
+    INDEX group_stream_audit_id (`inlong_group_id`, `inlong_stream_id`, `audit_id`, `log_ts`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8 COMMENT ='Inlong audit data table';
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
index b54fba22f..b9ba70058 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
 import org.springframework.stereotype.Repository;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
@@ -53,6 +54,18 @@ public interface InlongGroupEntityMapper {
      */
     List<SortSourceGroupInfo> selectAllGroups();
 
+    /**
+     * Select all groups which are logical deleted before the specified last modify time
+     * <p/>
+     * Note, ensure that all the group ids found have been deleted,
+     * and the group ids not deleted (is_deleted=0) should not be returned.
+     *
+     * @param timeBefore the latest modify time before which to select
+     * @param limit max item count
+     * @return all matched group ids
+     */
+    List<String> selectDeletedGroupIds(@Param("timeBefore") Date timeBefore, @Param("limit") Integer limit);
+
     int updateByPrimaryKey(InlongGroupEntity record);
 
     int updateByIdentifierSelective(InlongGroupEntity record);
@@ -62,4 +75,11 @@ public interface InlongGroupEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all inlong groups based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
index 48ef53044..7edd00626 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
@@ -66,4 +66,11 @@ public interface InlongGroupExtEntityMapper {
      */
     int logicDeleteAllByGroupId(String groupId);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all extended fields based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
index 006f26c4e..66f101a27 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
@@ -62,13 +62,13 @@ public interface InlongStreamEntityMapper {
      */
     void logicDeleteDlqOrRlq(String groupId, String streamId, String operator);
 
-    int deleteByPrimaryKey(Integer id);
+    int deleteById(Integer id);
 
     /**
-     * Physically delete all inlong streams of the specified inlong group id
+     * Physically delete all inlong streams based on inlong group ids
      *
      * @return rows deleted
      */
-    int deleteAllByGroupId(@Param("groupId") String groupId);
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
 
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
index 94b65419e..f59e9df94 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
@@ -53,13 +53,13 @@ public interface InlongStreamExtEntityMapper {
      */
     int logicDeleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
-    int deleteByPrimaryKey(Integer id);
+    int deleteById(Integer id);
 
     /**
-     * Physically delete all extension fields based on group id and stream id
+     * Physically delete all extension fields based on inlong group ids
      *
      * @return rows deleted
      */
-    int deleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
 
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
index 9a5465a06..e66b4987d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamFieldEntityMapper.java
@@ -47,4 +47,11 @@ public interface InlongStreamFieldEntityMapper {
      */
     int deleteAllByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all stream fields based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 2662e9c0a..7142976dc 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -142,6 +142,13 @@ public interface StreamSinkEntityMapper {
 
     int updateStatus(StreamSinkEntity entity);
 
-    int deleteByPrimaryKey(Integer id);
+    int deleteById(Integer id);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all stream sinks based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
index a75027b4d..b487deda7 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
@@ -38,24 +38,30 @@ public interface StreamSinkFieldEntityMapper {
      * According to the sink id, query the sink field.
      *
      * @param sinkId sink id.
-     * @return Sink field list.
+     * @return sink field list.
      */
     List<StreamSinkFieldEntity> selectBySinkId(@Param("sinkId") Integer sinkId);
 
     /**
-     * According to the sink id, logically delete the corresponding field information.
+     * Logically delete all stream sink fields based on sink id
      *
-     * @param sinkId sink id.
-     * @return rows deleted.
+     * @param sinkId sink id
+     * @return rows deleted
      */
     int logicDeleteAll(@Param("sinkId") Integer sinkId);
 
     /**
-     * According to the sink id, physically delete the corresponding field information
+     * Physically delete all stream sink fields based on sink id
      *
-     * @param sinkId sink id.
-     * @return rows deleted.
+     * @return rows deleted
      */
     int deleteAll(@Param("sinkId") Integer sinkId);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all stream sink fields based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 8cc63e4e3..6e851e11c 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -130,8 +130,15 @@ public interface StreamSourceEntityMapper {
     int updateSnapshot(StreamSourceEntity entity);
 
     /**
-     * Physical delete stream sources.
+     * Physical delete stream sources by group id and stream id
      */
     int deleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all stream sources based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index de5960ef3..0059e8362 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -59,4 +59,11 @@ public interface StreamSourceFieldEntityMapper {
      */
     int deleteAll(@Param("sourceId") Integer sourceId);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all stream source fields based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
index c4bc4ca1a..a087624a0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
@@ -41,4 +41,11 @@ public interface StreamTransformEntityMapper {
 
     int deleteById(Integer id);
 
-}
\ No newline at end of file
+    /**
+     * Physically delete all stream transforms based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java
index a38885440..8e542be17 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformFieldEntityMapper.java
@@ -26,25 +26,17 @@ import java.util.List;
 @Repository
 public interface StreamTransformFieldEntityMapper {
 
-    int deleteByPrimaryKey(Integer id);
-
     int insert(StreamTransformFieldEntity record);
 
     int insertSelective(StreamTransformFieldEntity record);
 
     /**
-     * Selete undeleted transform field by transform id.
-     *
-     * @param transformId
-     * @return
+     * Select undeleted transform field by transform id.
      */
     List<StreamTransformFieldEntity> selectByTransformId(@Param("transformId") Integer transformId);
 
     /**
-     * Selete undeleted transform field by transform ids.
-     *
-     * @param transformIds
-     * @return
+     * Select undeleted transform field by transform ids.
      */
     List<StreamTransformFieldEntity> selectByTransformIds(@Param("transformIds") List<Integer> transformIds);
 
@@ -56,16 +48,21 @@ public interface StreamTransformFieldEntityMapper {
 
     /**
      * Insert all field list
-     *
-     * @param fieldList
      */
     void insertAll(@Param("list") List<StreamTransformFieldEntity> fieldList);
 
+    int deleteById(Integer id);
+
     /**
      * Delete all field list by transformId
-     *
-     * @param transformId
-     * @return
      */
     int deleteAll(@Param("transformId") Integer transformId);
-}
\ No newline at end of file
+
+    /**
+     * Physically delete all stream transform fields based on inlong group ids
+     *
+     * @return rows deleted
+     */
+    int deleteByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java
index 2927601fb..f17d80d1b 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowEventLogEntityMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.dao.entity.WorkflowEventLogEntity;
 import org.apache.inlong.manager.pojo.workflow.EventLogRequest;
 import org.springframework.stereotype.Repository;
@@ -37,4 +38,11 @@ public interface WorkflowEventLogEntityMapper {
 
     int update(WorkflowEventLogEntity record);
 
+    /**
+     * Physically delete all event logs based on process ids
+     *
+     * @return rows deleted
+     */
+    int deleteByProcessIds(@Param("processIdList") List<Integer> processIdList);
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java
index 2834bf47a..9a8080c9a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowProcessEntityMapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.pojo.common.CountInfo;
 import org.apache.inlong.manager.pojo.workflow.ProcessCountRequest;
@@ -37,8 +38,20 @@ public interface WorkflowProcessEntityMapper {
 
     List<WorkflowProcessEntity> selectByCondition(ProcessRequest query);
 
+    /**
+     * Select process ids based on inlong group id list
+     */
+    List<Integer> selectByInlongGroupIds(@Param("groupIdList") List<String> groupIdList);
+
     List<CountInfo> countByQuery(ProcessCountRequest query);
 
     void update(WorkflowProcessEntity workflowProcessEntity);
 
+    /**
+     * Physically delete all process infos based on process ids
+     *
+     * @return rows deleted
+     */
+    int deleteByProcessIds(@Param("processIdList") List<Integer> processIdList);
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java
index 90fab1cac..786ecf0e2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/WorkflowTaskEntityMapper.java
@@ -48,4 +48,11 @@ public interface WorkflowTaskEntityMapper {
 
     int update(WorkflowTaskEntity workflowTaskEntity);
 
+    /**
+     * Physically delete all task infos based on process ids
+     *
+     * @return rows deleted
+     */
+    int deleteByProcessIds(@Param("processIdList") List<Integer> processIdList);
+
 }
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index f2b2b29e0..e3b832339 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -208,6 +208,14 @@
         from inlong_group
         where is_deleted = 0
     </select>
+    <select id="selectDeletedGroupIds" resultType="java.lang.String">
+        select inlong_group_id
+        from inlong_group
+        where modify_time &lt;= #{timeBefore, jdbcType=TIMESTAMP}
+        group by inlong_group_id
+        having min(is_deleted) > 0
+        limit #{limit, jdbcType=INTEGER}
+    </select>
 
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
         update inlong_group
@@ -317,4 +325,12 @@
         from inlong_group
         where id = #{id,jdbcType=INTEGER}
     </delete>
-</mapper>
\ No newline at end of file
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from inlong_group
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
index 4ac134c6f..7093ccb89 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
@@ -150,4 +150,13 @@
         where inlong_group_id = #{groupId, jdbcType=VARCHAR}
           and is_deleted = 0
     </update>
-</mapper>
\ No newline at end of file
+
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from inlong_group_ext
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index f6b3a549a..136c33f12 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -413,15 +413,17 @@
           and is_deleted = 0
     </update>
 
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    <delete id="deleteById" parameterType="java.lang.Integer">
         delete
         from inlong_stream
         where id = #{id,jdbcType=INTEGER}
     </delete>
-    <delete id="deleteAllByGroupId">
+    <delete id="deleteByInlongGroupIds">
         delete
         from inlong_stream
-        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
-          and is_deleted = 0
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
     </delete>
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
index efa7c726e..7e62e6e35 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
@@ -99,19 +99,17 @@
         </if>
     </update>
 
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    <delete id="deleteById" parameterType="java.lang.Integer">
         delete
         from inlong_stream_ext
         where id = #{id,jdbcType=INTEGER}
     </delete>
-
-    <delete id="deleteAllByRelatedId">
+    <delete id="deleteByInlongGroupIds">
         delete
         from inlong_stream_ext
-        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
-        <if test="streamId != null and streamId != ''">
-            and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-        </if>
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
     </delete>
-
-</mapper>
\ No newline at end of file
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
index 75f389ee0..de08a9aa6 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamFieldEntityMapper.xml
@@ -125,5 +125,12 @@
           and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
           and is_deleted = 0
     </delete>
-
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from inlong_stream_field
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 12e3bc6c0..c4af01696 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -439,9 +439,17 @@
         where id = #{id,jdbcType=INTEGER}
     </update>
 
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    <delete id="deleteById" parameterType="java.lang.Integer">
         delete
         from stream_sink
         where id = #{id,jdbcType=INTEGER}
     </delete>
-</mapper>
\ No newline at end of file
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from stream_sink
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 052b46556..8c85171ff 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -132,4 +132,12 @@
         from stream_sink_field
         where sink_id = #{sinkId,jdbcType=INTEGER}
     </delete>
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from stream_sink_field
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 07bb4e8d4..7b1d21071 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -401,4 +401,12 @@
             and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
         </if>
     </delete>
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from stream_source
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index 0bf9acd9b..f4ba00366 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -258,4 +258,12 @@
         from stream_source_field
         where source_id = #{sourceId,jdbcType=INTEGER}
     </delete>
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from stream_source_field
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
index 65c299556..302e5f81f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
@@ -202,4 +202,12 @@
         from stream_transform
         where id = #{id,jdbcType=INTEGER}
     </delete>
-</mapper>
\ No newline at end of file
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from stream_transform
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
+</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index f35c28675..25147dbf2 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -67,11 +67,7 @@
         from stream_transform_field
         where id = #{id,jdbcType=INTEGER}
     </select>
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-        delete
-        from stream_transform_field
-        where id = #{id,jdbcType=INTEGER}
-    </delete>
+
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
         insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
@@ -299,9 +295,22 @@
         </foreach>
     </insert>
 
+    <delete id="deleteById" parameterType="java.lang.Integer">
+        delete
+        from stream_transform_field
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
     <delete id="deleteAll">
         delete
         from stream_transform_field
         where transform_id = #{transformId,jdbcType=INTEGER}
     </delete>
+    <delete id="deleteByInlongGroupIds">
+        delete
+        from stream_transform_field
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml
index 97dad343b..3147b1f74 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowEventLogEntityMapper.xml
@@ -136,4 +136,13 @@
             remark    = #{remark,jdbcType=LONGVARCHAR}
         where id = #{id,jdbcType=INTEGER}
     </update>
+
+    <delete id="deleteByProcessIds">
+        delete
+        from workflow_event_log
+        where process_id in
+        <foreach item="item" index="index" collection="processIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
index 3a46eef18..1969918b4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowProcessEntityMapper.xml
@@ -111,6 +111,14 @@
         </where>
         order by id desc
     </select>
+    <select id="selectByInlongGroupIds" resultType="java.lang.Integer">
+        select id
+        from workflow_process
+        where inlong_group_id in
+        <foreach item="item" index="index" collection="groupIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </select>
 
     <select id="countByQuery" parameterType="org.apache.inlong.manager.pojo.workflow.ProcessCountRequest"
             resultType="org.apache.inlong.manager.pojo.common.CountInfo">
@@ -155,4 +163,13 @@
             ext_params = #{extParams,jdbcType=LONGVARCHAR}
         where id = #{id,jdbcType=INTEGER}
     </update>
+
+    <delete id="deleteByProcessIds">
+        delete
+        from workflow_process
+        where id in
+        <foreach item="item" index="index" collection="processIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml
index 8746164fd..84294c128 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/WorkflowTaskEntityMapper.xml
@@ -187,4 +187,12 @@
         where id = #{id,jdbcType=INTEGER}
     </update>
 
+    <delete id="deleteByProcessIds">
+        delete
+        from workflow_task
+        where process_id in
+        <foreach item="item" index="index" collection="processIdList" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+    </delete>
 </mapper>
diff --git a/inlong-manager/manager-docker/Dockerfile b/inlong-manager/manager-docker/Dockerfile
index c12a6d82f..405b3e533 100644
--- a/inlong-manager/manager-docker/Dockerfile
+++ b/inlong-manager/manager-docker/Dockerfile
@@ -27,6 +27,7 @@ ENV JDBC_URL=127.0.0.1:3306
 ENV USERNAME=root
 ENV PASSWORD=inlong
 ENV ZK_URL=127.0.0.1:2181
+ENV CLEANSING_ENABLE=false
 # support download plugins from remote address.
 ENV PLUGINS_URL=default
 # for flink-sort-plugin.properties
diff --git a/inlong-manager/manager-docker/manager-docker.sh b/inlong-manager/manager-docker/manager-docker.sh
index 7167a97d2..924f07ea9 100644
--- a/inlong-manager/manager-docker/manager-docker.sh
+++ b/inlong-manager/manager-docker/manager-docker.sh
@@ -33,6 +33,11 @@ sed -i "s/spring.profiles.active=.*$/spring.profiles.active=${ACTIVE_PROFILE}/g"
 sed -i "s/127.0.0.1:3306/${JDBC_URL}/g" "${conf_file}"
 sed -i "s/datasource.druid.username=.*$/datasource.druid.username=${USERNAME}/g" "${conf_file}"
 sed -i "s/datasource.druid.password=.*$/datasource.druid.password=${PASSWORD}/g" "${conf_file}"
+# for data cleansing
+sed -i "s/data.cleansing.enabled=.*$/data.cleansing.enabled=${CLEANSING_ENABLE}/g" "${conf_file}"
+sed -i "s/data.cleansing.interval.seconds=.*$/data.cleansing.interval.seconds=${CLEANSING_INTERVAL}/g" "${conf_file}"
+sed -i "s/data.cleansing.before.days=.*$/data.cleansing.before.days=${CLEANSING_BEFORE_DAYS}/g" "${conf_file}"
+sed -i "s/data.cleansing.batchSize=.*$/data.cleansing.batchSize=${CLEANSING_BATCHSIZE}/g" "${conf_file}"
 # for audit data
 sed -i "s/audit.ck.jdbcUrl=.*$/audit.ck.jdbcUrl=${AUDIT_CK_URL}/g" "${conf_file}"
 sed -i "s/audit.ck.username=.*$/audit.ck.username=${AUDIT_CK_USERNAME}/g" "${conf_file}"
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 5c447e4ef..d49eb9a34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -661,7 +661,7 @@ public class DataProxyConfigRepository implements IRepository {
         }
         // delete old stream sink
         deleteStreamSinks.forEach((v) -> {
-            streamSinkMapper.deleteByPrimaryKey(v.getId());
+            streamSinkMapper.deleteById(v.getId());
         });
         return inlongGroupId;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4e6c81278..1ae1e9498 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -384,7 +384,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isNotEmpty(entityList)) {
             entityList.forEach(entity -> {
-                sinkMapper.deleteByPrimaryKey(entity.getId());
+                sinkMapper.deleteById(entity.getId());
                 sinkFieldMapper.deleteAll(entity.getId());
             });
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
new file mode 100644
index 000000000..bb811c285
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.task;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamTransformFieldEntityMapper;
+import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
+import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
+import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Data cleansing task, it will be executed periodically if turned on.
+ */
+@Slf4j
+@Service
+public class DataCleansingTask extends TimerTask implements InitializingBean {
+
+    /**
+     * The execution starts after this delay in seconds.
+     */
+    private static final int INITIAL_DELAY = 60;
+
+    @Value("${data.cleansing.enabled:false}")
+    private Boolean enabled;
+    @Value("${data.cleansing.interval.seconds:1800}")
+    private Integer interval;
+    @Value("${data.cleansing.before.days:10}")
+    private Integer before;
+    @Value("${data.cleansing.batchSize:100}")
+    private Integer batchSize;
+
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongGroupExtEntityMapper groupExtMapper;
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private InlongStreamExtEntityMapper streamExtMapper;
+    @Autowired
+    private InlongStreamFieldEntityMapper streamFieldMapper;
+    @Autowired
+    private StreamSinkEntityMapper streamSinkMapper;
+    @Autowired
+    private StreamSinkFieldEntityMapper streamSinkFieldMapper;
+    @Autowired
+    private StreamSourceEntityMapper streamSourceMapper;
+    @Autowired
+    private StreamSourceFieldEntityMapper streamSourceFieldMapper;
+    @Autowired
+    private StreamTransformEntityMapper streamTransformMapper;
+    @Autowired
+    private StreamTransformFieldEntityMapper streamTransformFieldMapper;
+
+    @Autowired
+    private WorkflowProcessEntityMapper workflowProcessMapper;
+    @Autowired
+    private WorkflowTaskEntityMapper workflowTaskMapper;
+    @Autowired
+    private WorkflowEventLogEntityMapper workflowEventLogMapper;
+
+    @Override
+    public void afterPropertiesSet() {
+        if (enabled) {
+            log.info("start data cleansing timer task");
+            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+            executor.scheduleWithFixedDelay(this, INITIAL_DELAY, interval, TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            // get the date before `before` days
+            Calendar calendar = Calendar.getInstance();
+            calendar.set(Calendar.HOUR_OF_DAY, 0);
+            calendar.set(Calendar.MINUTE, 0);
+            calendar.set(Calendar.SECOND, 0);
+            calendar.set(Calendar.MILLISECOND, 0);
+            calendar.add(Calendar.DAY_OF_MONTH, -before);
+
+            Date daysBefore = calendar.getTime();
+            List<String> groupIds = groupMapper.selectDeletedGroupIds(daysBefore, batchSize);
+            if (CollectionUtils.isEmpty(groupIds)) {
+                return;
+            }
+
+            log.info("begin to delete data before {}, group ids {}", daysBefore, groupIds);
+
+            // cleanse configuration data
+            groupMapper.deleteByInlongGroupIds(groupIds);
+            groupExtMapper.deleteByInlongGroupIds(groupIds);
+            streamMapper.deleteByInlongGroupIds(groupIds);
+            streamExtMapper.deleteByInlongGroupIds(groupIds);
+            streamFieldMapper.deleteByInlongGroupIds(groupIds);
+
+            streamSinkMapper.deleteByInlongGroupIds(groupIds);
+            streamSinkFieldMapper.deleteByInlongGroupIds(groupIds);
+            streamSourceMapper.deleteByInlongGroupIds(groupIds);
+            streamSourceFieldMapper.deleteByInlongGroupIds(groupIds);
+            streamTransformMapper.deleteByInlongGroupIds(groupIds);
+            streamTransformFieldMapper.deleteByInlongGroupIds(groupIds);
+
+            // cleanse workflow data
+            List<Integer> processIds = workflowProcessMapper.selectByInlongGroupIds(groupIds);
+            workflowProcessMapper.deleteByProcessIds(processIds);
+            workflowTaskMapper.deleteByProcessIds(processIds);
+            workflowEventLogMapper.deleteByProcessIds(processIds);
+
+            log.info("success to delete data before {}, group ids size: {}", daysBefore, groupIds.size());
+        } catch (Exception e) {
+            log.error("exception while cleansing data from db: ", e);
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
index fc726976b..87959e5bb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepositoryTest.java
@@ -149,7 +149,7 @@ public class DataProxyConfigRepositoryTest {
         StreamSinkEntityMapper mapper = PowerMockito.mock(StreamSinkEntityMapper.class);
         PowerMockito.when(mapper.selectByCondition(any())).thenReturn(streamSinks);
         PowerMockito.when(mapper.insert(any())).thenReturn(1);
-        PowerMockito.when(mapper.deleteByPrimaryKey(anyInt())).thenReturn(1);
+        PowerMockito.when(mapper.deleteById(anyInt())).thenReturn(1);
         return mapper;
     }
 
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 91b731723..5fe7e6076 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -50,7 +50,9 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `version`                int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
-    INDEX group_status_deleted_index (`status`, `is_deleted`)
+    INDEX `group_status_deleted_index` (`status`, `is_deleted`),
+    INDEX `group_modify_time_index` (`modify_time`),
+    INDEX `group_cluster_tag_index` (`inlong_cluster_tag`)
 );
 
 -- ----------------------------
@@ -65,8 +67,8 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
     `is_deleted`      int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    KEY `group_id_index` (`inlong_group_id`),
-    UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
+    UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`),
+    INDEX `group_ext_group_index` (`inlong_group_id`)
 );
 
 -- ----------------------------
@@ -114,7 +116,8 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
     `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `version`      int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`),
+    INDEX `cluster_type_index` (`type`)
 );
 
 -- ----------------------------
@@ -232,8 +235,7 @@ CREATE TABLE IF NOT EXISTS `consumption_pulsar`
     `dead_letter_topic`  varchar(256) DEFAULT NULL COMMENT 'dead letter topic name',
     `is_deleted`         int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`)
-) COMMENT ='Pulsar consumption table';
-
+);
 -- ----------------------------
 -- Table structure for stream_source_cmd_config
 -- ----------------------------
@@ -248,7 +250,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
     `modify_time`         timestamp   NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `result_info`         varchar(64)      DEFAULT NULL,
     PRIMARY KEY (`id`),
-    KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
+    UNIQUE KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
 );
 
 -- ----------------------------
@@ -282,7 +284,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `version`          int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`),
+    INDEX `stream_group_id_index` (`inlong_group_id`)
 );
 
 -- ----------------------------
@@ -299,7 +302,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
     `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
-    KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
+    INDEX `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -322,7 +325,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `field_stream_id_index` (`inlong_stream_id`)
+    INDEX `stream_field_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -377,9 +380,9 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
-    KEY `source_status_index` (`status`, `is_deleted`),
-    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
-    KEY `template_id_index` (`template_id`)
+    INDEX `source_status_index` (`status`, `is_deleted`),
+    INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+    INDEX `source_template_id_index` (`template_id`)
 );
 
 -- ----------------------------
@@ -443,8 +446,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `source_id`        int(11)      NOT NULL COMMENT 'Sink id',
-    `source_type`      varchar(15)  NOT NULL COMMENT 'Sink type',
+    `source_id`        int(11)      NOT NULL COMMENT 'Source id',
+    `source_type`      varchar(15)  NOT NULL COMMENT 'Source type',
     `field_name`       varchar(120) NOT NULL COMMENT 'field name',
     `field_value`      varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
     `pre_expression`   varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
@@ -456,7 +459,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `source_id_index` (`source_id`)
+    INDEX `source_id_index` (`source_id`),
+    INDEX `source_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -483,7 +487,8 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
     -- The source node name of the transport field
     `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name before transform operation',
     PRIMARY KEY (`id`),
-    KEY `transform_id_index` (`transform_id`)
+    INDEX `transform_id_index` (`transform_id`),
+    INDEX `transform_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -509,7 +514,9 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
     `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name before transform operation',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    INDEX `sink_id_index` (`sink_id`),
+    INDEX `sink_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -592,7 +599,7 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
     `is_deleted`   int(11)                DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
     `version`      int(11)       NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    KEY `process_name_task_name_index` (`process_name`, `task_name`)
+    INDEX `process_name_task_name_index` (`process_name`, `task_name`)
 );
 
 -- create workflow approver for newly inlong group and inlong consume.
@@ -624,7 +631,8 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
     `remark`               text COMMENT 'Execution result remark information',
     `exception`            mediumtext COMMENT 'Exception information',
     PRIMARY KEY (`id`),
-    INDEX event_group_status_index (`inlong_group_id`, `status`)
+    INDEX event_group_status_index (`inlong_group_id`, `status`),
+    INDEX event_process_task_index (`process_id`, `task_id`)
 );
 
 -- ----------------------------
@@ -716,7 +724,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
     `topic`        varchar(128) DEFAULT '' COMMENT 'Topic',
     `ext_params`   mediumtext   DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
     PRIMARY KEY (`id`),
-    KEY `sort_source_config_index` (`cluster_name`, `task_name`)
+    INDEX `sort_source_config_index` (`cluster_name`, `task_name`)
 );
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index bd0416aaa..6a044065e 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -56,7 +56,9 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `version`                int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
-    INDEX group_status_deleted_index (`status`, `is_deleted`)
+    INDEX `group_status_deleted_index` (`status`, `is_deleted`),
+    INDEX `group_modify_time_index` (`modify_time`),
+    INDEX `group_cluster_tag_index` (`inlong_cluster_tag`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group table';
 
@@ -72,8 +74,8 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
     `is_deleted`      int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    KEY `group_id_index` (`inlong_group_id`),
-    UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
+    UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`),
+    INDEX `group_ext_group_index` (`inlong_group_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group extension table';
 
@@ -123,7 +125,8 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster`
     `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `version`      int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`),
+    INDEX `cluster_type_index` (`type`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster table';
 
@@ -262,7 +265,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
     `modify_time`         timestamp   NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `result_info`         varchar(64)      DEFAULT NULL,
     PRIMARY KEY (`id`),
-    KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
+    UNIQUE KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8;
 
@@ -297,7 +300,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `version`          int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`),
+    INDEX `stream_group_id_index` (`inlong_group_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream table';
 
@@ -315,7 +319,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
     `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
-    KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
+    INDEX `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
 
@@ -339,7 +343,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `field_stream_id_index` (`inlong_stream_id`)
+    INDEX `stream_field_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='File/DB data source field table';
 
@@ -396,9 +400,9 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
-    KEY `source_status_index` (`status`, `is_deleted`),
-    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`),
-    KEY `template_id_index` (`template_id`)
+    INDEX `source_status_index` (`status`, `is_deleted`),
+    INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
+    INDEX `source_template_id_index` (`template_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
 
@@ -465,8 +469,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `source_id`        int(11)      NOT NULL COMMENT 'Sink id',
-    `source_type`      varchar(15)  NOT NULL COMMENT 'Sink type',
+    `source_id`        int(11)      NOT NULL COMMENT 'Source id',
+    `source_type`      varchar(15)  NOT NULL COMMENT 'Source type',
     `field_name`       varchar(120) NOT NULL COMMENT 'field name',
     `field_value`      varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
     `pre_expression`   varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
@@ -478,7 +482,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `source_id_index` (`source_id`)
+    INDEX `source_id_index` (`source_id`),
+    INDEX `source_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
 
@@ -506,7 +511,8 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
     -- The source node name of the transport field
     `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name before transform operation',
     PRIMARY KEY (`id`),
-    KEY `transform_id_index` (`transform_id`)
+    INDEX `transform_id_index` (`transform_id`),
+    INDEX `transform_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
 
@@ -533,7 +539,9 @@ CREATE TABLE IF NOT EXISTS `stream_sink_field`
     `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name before transform operation',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    INDEX `sink_id_index` (`sink_id`),
+    INDEX `sink_group_stream_index` (`inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink field table';
 
@@ -628,7 +636,7 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
     `is_deleted`   int(11)                DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
     `version`      int(11)       NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
     PRIMARY KEY (`id`),
-    KEY `process_name_task_name_index` (`process_name`, `task_name`)
+    INDEX `process_name_task_name_index` (`process_name`, `task_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow approver table';
 
@@ -661,7 +669,8 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
     `remark`               text COMMENT 'Execution result remark information',
     `exception`            mediumtext COMMENT 'Exception information',
     PRIMARY KEY (`id`),
-    INDEX event_group_status_index (`inlong_group_id`, `status`)
+    INDEX event_group_status_index (`inlong_group_id`, `status`),
+    INDEX event_process_task_index (`process_id`, `task_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow event log table';
 
@@ -757,7 +766,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
     `topic`        varchar(128) DEFAULT '' COMMENT 'Topic',
     `ext_params`   mediumtext   DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
     PRIMARY KEY (`id`),
-    KEY `sort_source_config_index` (`cluster_name`, `task_name`)
+    INDEX `sort_source_config_index` (`cluster_name`, `task_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
 
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index bf0030587..2a3f4e767 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -72,3 +72,13 @@ audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 audit.ck.username=default
 # ClickHouse password
 audit.ck.password=
+
+# Database cleansing
+# If turned on, logically deleted data will be collected and permanently deleted periodically
+data.cleansing.enabled=false
+# The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour)
+data.cleansing.interval.seconds=1800
+# Select the data whose latest modify time is some days before, default is 10 days
+data.cleansing.before.days=10
+# The maximum size of data to be deleted in batch, default is 100
+data.cleansing.batchSize=100
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 8c5fc1d79..a0f785e40 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -70,3 +70,13 @@ audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 audit.ck.username=default
 # ClickHouse password
 audit.ck.password=
+
+# Database cleansing
+# If turned on, logically deleted data will be collected and permanently deleted periodically
+data.cleansing.enabled=false
+# The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour)
+data.cleansing.interval.seconds=1800
+# Select the data whose latest modify time is some days before, default is 10 days
+data.cleansing.before.days=10
+# The maximum size of data to be deleted in batch, default is 100
+data.cleansing.batchSize=100
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index d8f927a54..28cd6a856 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -71,3 +71,13 @@ audit.ck.jdbcUrl=jdbc:clickhouse://127.0.0.1:8123/apache_inlong_audit
 audit.ck.username=default
 # ClickHouse password
 audit.ck.password=
+
+# Database cleansing
+# If turned on, logically deleted data will be collected and permanently deleted periodically
+data.cleansing.enabled=false
+# The interval (in seconds) between the end of one execution and the start of the next, default is 1800s (0.5 hour)
+data.cleansing.interval.seconds=1800
+# Select the data whose latest modify time is some days before, default is 10 days
+data.cleansing.before.days=10
+# The maximum size of data to be deleted in batch, default is 100
+data.cleansing.batchSize=100
diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties
index 91d6eacf8..796896f23 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -15,7 +15,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
 server.host=127.0.0.1
 server.port=8083
@@ -58,9 +57,9 @@ inlong.auth.type=default
 inlong.encrypt.version=1
 inlong.encrypt.key.value1="I!N@L#O$N%G^"
 
-# clients (e.g. agent and dataproxy) must be authenticated by secretId and secretKey if turned on
+# Clients (e.g. agent and dataproxy) must be authenticated by secretId and secretKey if turned on
 openapi.auth.enabled=false
 
-# audit view by role, see audit id definitions: https://inlong.apache.org/docs/modules/audit/overview#audit-id
+# Audit view by role, see audit id definitions: https://inlong.apache.org/docs/modules/audit/overview#audit-id
 audit.admin.ids=3,4,5,6,7,8
 audit.user.ids=3,4,5,6,7,8