You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/29 10:54:28 UTC

[inlong] branch master updated: [INLONG-4796][Manager] Support management of Inlong cluster tag (#4800)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b3ae294b [INLONG-4796][Manager] Support management of Inlong cluster tag (#4800)
3b3ae294b is described below

commit 3b3ae294bdb0f6c859a243ec321a685e33de1344
Author: healzhou <he...@gmail.com>
AuthorDate: Wed Jun 29 18:54:22 2022 +0800

    [INLONG-4796][Manager] Support management of Inlong cluster tag (#4800)
---
 .../client/api/inner/InnerInlongManagerClient.java |   2 +-
 .../api/inner/InnerInlongManagerClientTest.java    |   4 +-
 .../pojo/cluster/ClusterTagPageRequest.java}       |  47 +++--
 .../common/pojo/cluster/ClusterTagRequest.java}    |  36 ++--
 ...ongClusterInfo.java => ClusterTagResponse.java} |  37 +---
 .../common/pojo/cluster/InlongClusterInfo.java     |   4 +-
 .../common/pojo/cluster/InlongClusterRequest.java  |   4 +-
 .../common/pojo/dataproxy/CacheCluster.java        |  12 +-
 .../manager/dao/entity/InlongClusterEntity.java    |   2 +-
 ...sterEntity.java => InlongClusterTagEntity.java} |  15 +-
 .../dao/mapper/InlongClusterEntityMapper.java      |   8 +-
 .../dao/mapper/InlongClusterNodeEntityMapper.java  |   2 -
 ...pper.java => InlongClusterTagEntityMapper.java} |  24 +--
 .../main/resources/mappers/ClusterSetMapper.xml    |  22 +--
 .../mappers/InlongClusterEntityMapper.xml          | 162 +++-------------
 .../mappers/InlongClusterNodeEntityMapper.xml      |  80 --------
 .../mappers/InlongClusterTagEntityMapper.xml       | 132 +++++++++++++
 .../service/cluster/AbstractClusterOperator.java   |   5 +-
 .../service/cluster/InlongClusterService.java      |  46 +++++
 .../service/cluster/InlongClusterServiceImpl.java  | 119 +++++++++++-
 .../service/core/impl/DataNodeServiceImpl.java     |   7 +-
 .../repository/DataProxyConfigRepository.java      |   2 +-
 .../service/sink/StreamSinkServiceImpl.java        |   2 +-
 .../service/source/StreamSourceServiceImpl.java    |   2 +-
 .../service/cluster/InlongClusterServiceTest.java  |   6 +-
 .../main/resources/h2/apache_inlong_manager.sql    | 213 +++++++++++----------
 .../manager-web/sql/apache_inlong_manager.sql      | 194 ++++++++++---------
 .../web/controller/InlongClusterController.java    |  75 ++++++--
 28 files changed, 694 insertions(+), 570 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 2b711e25b..d3936b2aa 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -135,7 +135,7 @@ public class InnerInlongManagerClient {
     public Integer saveCluster(InlongClusterRequest request) {
         AssertUtils.notEmpty(request.getName(), "cluster name should not be empty");
         AssertUtils.notEmpty(request.getType(), "cluster type should not be empty");
-        AssertUtils.notEmpty(request.getClusterTag(), "cluster tag should not be empty");
+        AssertUtils.notEmpty(request.getClusterTags(), "cluster tags should not be empty");
         Response<Integer> clusterIndexResponse = executeHttpCall(inlongClusterApi.save(request));
         assertRespSuccess(clusterIndexResponse);
         return clusterIndexResponse.getData();
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
index ba7d84bc1..89b04ca41 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
@@ -680,9 +680,9 @@ class InnerInlongManagerClientTest {
         InlongClusterRequest request = new InlongClusterRequest();
         request.setName("pulsar");
         request.setType("PULSAR");
-        request.setClusterTag("test_cluster");
+        request.setClusterTags("test_cluster");
         Integer clusterIndex = innerInlongManagerClient.saveCluster(request);
-        Assertions.assertTrue(clusterIndex == 1);
+        Assertions.assertEquals(1, (int) clusterIndex);
     }
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagPageRequest.java
similarity index 52%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagPageRequest.java
index 27f237083..3535e6fe7 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagPageRequest.java
@@ -15,38 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.cluster;
 
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.beans.PageRequest;
 
 /**
- * Inlong cluster entity, including name, type, cluster tag, etc.
+ * Cluster tag paging query conditions
  */
 @Data
-public class InlongClusterEntity implements Serializable {
-
-    private static final long serialVersionUID = 1L;
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Cluster tag paging query request")
+public class ClusterTagPageRequest extends PageRequest {
 
-    private Integer id;
-    private String name;
-    private String type;
-    private String url;
-    private String clusterTag;
-    private String extTag;
-    private String token;
-
-    private String extParams;
-    private String heartbeat;
-    private String inCharges;
+    @ApiModelProperty(value = "Keywords, used for fuzzy query")
+    private String keyword;
 
+    @ApiModelProperty(value = "Status")
     private Integer status;
-    private Integer isDeleted;
-    private String creator;
-    private String modifier;
-    private Date createTime;
-    private Date modifyTime;
 
-}
\ No newline at end of file
+    @ApiModelProperty(value = "Current user", hidden = true)
+    private String currentUser;
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagRequest.java
similarity index 61%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagRequest.java
index 27f237083..ef2ffd7d2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagRequest.java
@@ -15,38 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.cluster;
 
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
-import java.io.Serializable;
-import java.util.Date;
+import javax.validation.constraints.NotNull;
 
 /**
- * Inlong cluster entity, including name, type, cluster tag, etc.
+ * Inlong cluster tag request
  */
 @Data
-public class InlongClusterEntity implements Serializable {
-
-    private static final long serialVersionUID = 1L;
+@ApiModel("Cluster tag request")
+public class ClusterTagRequest {
 
+    @ApiModelProperty(value = "Primary key")
     private Integer id;
-    private String name;
-    private String type;
-    private String url;
+
+    @NotNull
+    @ApiModelProperty(value = "Cluster type")
     private String clusterTag;
-    private String extTag;
-    private String token;
 
+    @ApiModelProperty(value = "Extended params")
     private String extParams;
-    private String heartbeat;
-    private String inCharges;
 
-    private Integer status;
-    private Integer isDeleted;
-    private String creator;
-    private String modifier;
-    private Date createTime;
-    private Date modifyTime;
+    @ApiModelProperty(value = "Name of in charges, separated by commas")
+    private String inCharges;
 
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagResponse.java
similarity index 61%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagResponse.java
index c34e50427..c85098997 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagResponse.java
@@ -18,50 +18,25 @@
 package org.apache.inlong.manager.common.pojo.cluster;
 
 import com.fasterxml.jackson.annotation.JsonFormat;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
 import java.util.Date;
 
 /**
- * Inlong cluster info
+ * Inlong cluster tag response
  */
 @Data
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Inlong cluster info")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
-public class InlongClusterInfo {
+@ApiModel("Cluster tag response")
+public class ClusterTagResponse {
 
     @ApiModelProperty(value = "Primary key")
     private Integer id;
 
-    @ApiModelProperty(value = "Cluster name")
-    private String name;
-
-    @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, DATA_PROXY, etc.")
-    private String type;
-
-    @ApiModelProperty(value = "Cluster url")
-    private String url;
-
-    @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, DATA_PROXY, etc.")
+    @ApiModelProperty(value = "Cluster tag")
     private String clusterTag;
 
-    @ApiModelProperty(value = "Extension tag")
-    private String extTag;
-
-    @ApiModelProperty(value = "Cluster token")
-    private String token;
-
-    @ApiModelProperty(value = "Cluster heartbeat info")
-    private String heartbeat;
-
     @ApiModelProperty(value = "Extended params")
     private String extParams;
 
@@ -83,8 +58,4 @@ public class InlongClusterInfo {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
-    public InlongClusterRequest genRequest() {
-        return CommonBeanUtils.copyProperties(this, InlongClusterRequest::new);
-    }
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
index c34e50427..690d51a75 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
@@ -50,8 +50,8 @@ public class InlongClusterInfo {
     @ApiModelProperty(value = "Cluster url")
     private String url;
 
-    @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, DATA_PROXY, etc.")
-    private String clusterTag;
+    @ApiModelProperty(value = "Cluster tags, separated by commas")
+    private String clusterTags;
 
     @ApiModelProperty(value = "Extension tag")
     private String extTag;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java
index 70ce56ea1..9d699649c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java
@@ -53,8 +53,8 @@ public class InlongClusterRequest {
     private String url;
 
     @NotBlank
-    @ApiModelProperty(value = "Cluster tag")
-    private String clusterTag;
+    @ApiModelProperty(value = "Cluster tags, separated by commas")
+    private String clusterTags;
 
     @ApiModelProperty(value = "Extension tag")
     private String extTag;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
index 8d71adc9b..1d130c32d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
@@ -24,7 +24,7 @@ public class CacheCluster {
 
     private String clusterName;
     private String type;
-    private String clusterTag;
+    private String clusterTags;
     private String extTag;
     private String extParams;
 
@@ -69,17 +69,17 @@ public class CacheCluster {
      *
      * @return the clusterTag
      */
-    public String getClusterTag() {
-        return clusterTag;
+    public String getClusterTags() {
+        return clusterTags;
     }
 
     /**
      * set clusterTag
      *
-     * @param clusterTag the clusterTag to set
+     * @param clusterTags the clusterTag to set
      */
-    public void setClusterTag(String clusterTag) {
-        this.clusterTag = clusterTag;
+    public void setClusterTags(String clusterTags) {
+        this.clusterTags = clusterTags;
     }
 
     /**
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
index 27f237083..46b67f765 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
@@ -34,7 +34,7 @@ public class InlongClusterEntity implements Serializable {
     private String name;
     private String type;
     private String url;
-    private String clusterTag;
+    private String clusterTags;
     private String extTag;
     private String token;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterTagEntity.java
similarity index 81%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterTagEntity.java
index 27f237083..7bc2b379f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterTagEntity.java
@@ -23,25 +23,16 @@ import java.io.Serializable;
 import java.util.Date;
 
 /**
- * Inlong cluster entity, including name, type, cluster tag, etc.
+ * Inlong cluster tag entity.
  */
 @Data
-public class InlongClusterEntity implements Serializable {
+public class InlongClusterTagEntity implements Serializable {
 
     private static final long serialVersionUID = 1L;
-
     private Integer id;
-    private String name;
-    private String type;
-    private String url;
     private String clusterTag;
-    private String extTag;
-    private String token;
-
     private String extParams;
-    private String heartbeat;
     private String inCharges;
-
     private Integer status;
     private Integer isDeleted;
     private String creator;
@@ -49,4 +40,4 @@ public class InlongClusterEntity implements Serializable {
     private Date createTime;
     private Date modifyTime;
 
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index c25e7ff60..b42103ebd 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -29,22 +29,20 @@ public interface InlongClusterEntityMapper {
 
     int insert(InlongClusterEntity record);
 
-    int insertSelective(InlongClusterEntity record);
-
     InlongClusterEntity selectById(Integer id);
 
     /**
-     * Select clusters by tag, name and type, the tag and name can be null.
+     * Select clusters by tags, name and type, the tag and name can be null.
      */
     List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @Param("name") String name,
             @Param("type") String type);
 
     List<InlongClusterEntity> selectByCondition(InlongClusterPageRequest request);
 
-    int updateByIdSelective(InlongClusterEntity record);
-
     int updateById(InlongClusterEntity record);
 
+    int updateByIdSelective(InlongClusterEntity record);
+
     int deleteByPrimaryKey(Integer id);
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 9847f05b8..7a13d9939 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -30,8 +30,6 @@ public interface InlongClusterNodeEntityMapper {
 
     int insert(InlongClusterNodeEntity record);
 
-    int insertSelective(InlongClusterNodeEntity record);
-
     InlongClusterNodeEntity selectById(Integer id);
 
     InlongClusterNodeEntity selectByUniqueKey(ClusterNodeRequest request);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterTagEntityMapper.java
similarity index 58%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterTagEntityMapper.java
index c25e7ff60..2d0d111bc 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterTagEntityMapper.java
@@ -18,32 +18,26 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
 import org.springframework.stereotype.Repository;
 
 import java.util.List;
 
 @Repository
-public interface InlongClusterEntityMapper {
+public interface InlongClusterTagEntityMapper {
 
-    int insert(InlongClusterEntity record);
+    int insert(InlongClusterTagEntity record);
 
-    int insertSelective(InlongClusterEntity record);
+    InlongClusterTagEntity selectById(Integer id);
 
-    InlongClusterEntity selectById(Integer id);
+    InlongClusterTagEntity selectByTag(@Param("clusterTag") String clusterTag);
 
-    /**
-     * Select clusters by tag, name and type, the tag and name can be null.
-     */
-    List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @Param("name") String name,
-            @Param("type") String type);
+    List<InlongClusterTagEntity> selectByCondition(ClusterTagPageRequest request);
 
-    List<InlongClusterEntity> selectByCondition(InlongClusterPageRequest request);
+    int updateById(InlongClusterTagEntity record);
 
-    int updateByIdSelective(InlongClusterEntity record);
-
-    int updateById(InlongClusterEntity record);
+    int updateByPrimaryKeySelective(InlongClusterTagEntity record);
 
     int deleteByPrimaryKey(Integer id);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 775d8f21d..c0aa2f5cf 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -19,38 +19,32 @@
 -->
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper
-        namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
-    <select id="selectProxyCluster"
-            resultType="org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster">
-        select name as cluster_name, cluster_tag, ext_tag, ext_params
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
+    <select id="selectProxyCluster" resultType="org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster">
+        select name as cluster_name, cluster_tags, ext_tag, ext_params
         from inlong_cluster
         where type = 'DATA_PROXY'
           and is_deleted = '0'
     </select>
-    <select id="selectCacheCluster"
-            resultType="org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster">
+    <select id="selectCacheCluster" resultType="org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster">
         select name as cluster_name,
                type,
-               cluster_tag,
+               cluster_tags,
                ext_tag,
                ext_params
         from inlong_cluster
         where type in ('PULSAR', 'KAFKA', 'TUBE')
           and is_deleted = '0'
     </select>
-    <select id="selectInlongGroupId"
-            resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId">
+    <select id="selectInlongGroupId" resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId">
         select inlong_group_id,
-               inlong_cluster_tag as
-                                     cluster_tag,
+               inlong_cluster_tag as cluster_tag,
                mq_resource        as topic,
                ext_params
         from inlong_group
         where is_deleted = 0
     </select>
-    <select id="selectInlongStreamId"
-            resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId">
+    <select id="selectInlongStreamId" resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId">
         select inlong_group_id,
                inlong_stream_id,
                mq_resource as topic,
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index 52b02f22c..4edbc18ba 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -25,7 +25,7 @@
         <result column="name" jdbcType="VARCHAR" property="name"/>
         <result column="type" jdbcType="VARCHAR" property="type"/>
         <result column="url" jdbcType="VARCHAR" property="url"/>
-        <result column="cluster_tag" jdbcType="VARCHAR" property="clusterTag"/>
+        <result column="cluster_tags" jdbcType="VARCHAR" property="clusterTags"/>
         <result column="ext_tag" jdbcType="VARCHAR" property="extTag"/>
         <result column="token" jdbcType="VARCHAR" property="token"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
@@ -39,129 +39,25 @@
         <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
     </resultMap>
     <sql id="Base_Column_List">
-        id, name, type, url, cluster_tag, ext_tag, token, ext_params, heartbeat,
+        id, name, type, url, cluster_tags, ext_tag, token, ext_params, heartbeat,
         in_charges, status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
         insert into inlong_cluster (id, name, type,
-                                    url, cluster_tag, ext_tag,
+                                    url, cluster_tags, ext_tag,
                                     token, ext_params, heartbeat,
                                     in_charges, status, is_deleted,
                                     creator, modifier,
                                     create_time, modify_time)
         values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
-                #{url,jdbcType=VARCHAR}, #{clusterTag,jdbcType=VARCHAR}, #{extTag,jdbcType=VARCHAR},
+                #{url,jdbcType=VARCHAR}, #{clusterTags,jdbcType=VARCHAR}, #{extTag,jdbcType=VARCHAR},
                 #{token,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{heartbeat,jdbcType=LONGVARCHAR},
                 #{inCharges,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
                 #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
                 #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
     </insert>
-    <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
-            parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
-        insert into inlong_cluster
-        <trim prefix="(" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                id,
-            </if>
-            <if test="name != null">
-                name,
-            </if>
-            <if test="type != null">
-                type,
-            </if>
-            <if test="url != null">
-                url,
-            </if>
-            <if test="clusterTag != null">
-                cluster_tag,
-            </if>
-            <if test="extTag != null">
-                ext_tag,
-            </if>
-            <if test="token != null">
-                token,
-            </if>
-            <if test="inCharges != null">
-                in_charges,
-            </if>
-            <if test="status != null">
-                status,
-            </if>
-            <if test="isDeleted != null">
-                is_deleted,
-            </if>
-            <if test="creator != null">
-                creator,
-            </if>
-            <if test="modifier != null">
-                modifier,
-            </if>
-            <if test="createTime != null">
-                create_time,
-            </if>
-            <if test="modifyTime != null">
-                modify_time,
-            </if>
-            <if test="extParams != null">
-                ext_params,
-            </if>
-            <if test="heartbeat != null">
-                heartbeat,
-            </if>
-        </trim>
-        <trim prefix="values (" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                #{id,jdbcType=INTEGER},
-            </if>
-            <if test="name != null">
-                #{name,jdbcType=VARCHAR},
-            </if>
-            <if test="type != null">
-                #{type,jdbcType=VARCHAR},
-            </if>
-            <if test="url != null">
-                #{url,jdbcType=VARCHAR},
-            </if>
-            <if test="clusterTag != null">
-                #{clusterTag,jdbcType=VARCHAR},
-            </if>
-            <if test="extTag != null">
-                #{extTag,jdbcType=VARCHAR},
-            </if>
-            <if test="token != null">
-                #{token,jdbcType=VARCHAR},
-            </if>
-            <if test="inCharges != null">
-                #{inCharges,jdbcType=VARCHAR},
-            </if>
-            <if test="status != null">
-                #{status,jdbcType=INTEGER},
-            </if>
-            <if test="isDeleted != null">
-                #{isDeleted,jdbcType=INTEGER},
-            </if>
-            <if test="creator != null">
-                #{creator,jdbcType=VARCHAR},
-            </if>
-            <if test="modifier != null">
-                #{modifier,jdbcType=VARCHAR},
-            </if>
-            <if test="createTime != null">
-                #{createTime,jdbcType=TIMESTAMP},
-            </if>
-            <if test="modifyTime != null">
-                #{modifyTime,jdbcType=TIMESTAMP},
-            </if>
-            <if test="extParams != null">
-                #{extParams,jdbcType=LONGVARCHAR},
-            </if>
-            <if test="heartbeat != null">
-                #{heartbeat,jdbcType=LONGVARCHAR},
-            </if>
-        </trim>
-    </insert>
 
     <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
         select
@@ -169,26 +65,6 @@
         from inlong_cluster
         where id = #{id,jdbcType=INTEGER}
     </select>
-
-    <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
-        update inlong_cluster
-        set name        = #{name,jdbcType=VARCHAR},
-            type        = #{type,jdbcType=VARCHAR},
-            url         = #{url,jdbcType=VARCHAR},
-            cluster_tag = #{clusterTag,jdbcType=VARCHAR},
-            ext_tag     = #{extTag,jdbcType=VARCHAR},
-            token       = #{token,jdbcType=VARCHAR},
-            ext_params  = #{extParams,jdbcType=LONGVARCHAR},
-            heartbeat   = #{heartbeat,jdbcType=LONGVARCHAR},
-            in_charges  = #{inCharges,jdbcType=VARCHAR},
-            status      = #{status,jdbcType=INTEGER},
-            is_deleted  = #{isDeleted,jdbcType=INTEGER},
-            creator     = #{creator,jdbcType=VARCHAR},
-            modifier    = #{modifier,jdbcType=VARCHAR},
-            create_time = #{createTime,jdbcType=TIMESTAMP},
-            modify_time = #{modifyTime,jdbcType=TIMESTAMP}
-        where id = #{id,jdbcType=INTEGER}
-    </update>
     <select id="selectByKey" parameterType="org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest"
             resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
         select
@@ -198,7 +74,7 @@
             is_deleted = 0
             and `type` = #{type, jdbcType=VARCHAR}
             <if test="clusterTag != null and clusterTag != ''">
-                and cluster_tag = #{clusterTag, jdbcType=VARCHAR}
+                and find_in_set(#{clusterTag, jdbcType=VARCHAR}, cluster_tags)
             </if>
             <if test="name != null and name != ''">
                 and name = #{name, jdbcType=VARCHAR}
@@ -227,16 +103,15 @@
                 </foreach>
             </if>
             <if test="clusterTag != null and clusterTag != ''">
-                and cluster_tag = #{clusterTag, jdbcType=VARCHAR}
+                and find_in_set(#{clusterTag, jdbcType=VARCHAR}, cluster_tags)
             </if>
             <if test="extTag != null and extTag != ''">
                 and ext_tag = #{extTag, jdbcType=VARCHAR}
             </if>
             <if test="keyword != null and keyword != ''">
                 name like CONCAT('%', #{keyword}, '%')
-                or cluster_tag like CONCAT('%', #{keyword}, '%')
+                or cluster_tags like CONCAT('%', #{keyword}, '%')
                 or ext_tag like CONCAT('%', #{keyword}, '%')
-                or url like CONCAT('%', #{keyword}, '%')
                 )
             </if>
             <if test="status != null and status != ''">
@@ -246,6 +121,25 @@
         order by modify_time desc
     </select>
 
+    <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
+        update inlong_cluster
+        set name         = #{name,jdbcType=VARCHAR},
+            type         = #{type,jdbcType=VARCHAR},
+            url          = #{url,jdbcType=VARCHAR},
+            cluster_tags = #{clusterTags,jdbcType=VARCHAR},
+            ext_tag      = #{extTag,jdbcType=VARCHAR},
+            token        = #{token,jdbcType=VARCHAR},
+            ext_params   = #{extParams,jdbcType=LONGVARCHAR},
+            heartbeat    = #{heartbeat,jdbcType=LONGVARCHAR},
+            in_charges   = #{inCharges,jdbcType=VARCHAR},
+            status       = #{status,jdbcType=INTEGER},
+            is_deleted   = #{isDeleted,jdbcType=INTEGER},
+            creator      = #{creator,jdbcType=VARCHAR},
+            modifier     = #{modifier,jdbcType=VARCHAR},
+            create_time  = #{createTime,jdbcType=TIMESTAMP},
+            modify_time  = #{modifyTime,jdbcType=TIMESTAMP}
+        where id = #{id,jdbcType=INTEGER}
+    </update>
     <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
         update inlong_cluster
         <set>
@@ -258,8 +152,8 @@
             <if test="url != null">
                 url = #{url,jdbcType=VARCHAR},
             </if>
-            <if test="clusterTag != null">
-                cluster_tag = #{clusterTag,jdbcType=VARCHAR},
+            <if test="clusterTags != null">
+                cluster_tags = #{clusterTags,jdbcType=VARCHAR},
             </if>
             <if test="extTag != null">
                 ext_tag = #{extTag,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 732ed3c8a..3be803730 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -50,86 +50,6 @@
                 #{status,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
                 #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
     </insert>
-    <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
-            parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
-        insert into inlong_cluster_node
-        <trim prefix="(" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                id,
-            </if>
-            <if test="parentId != null">
-                parent_id,
-            </if>
-            <if test="type != null">
-                type,
-            </if>
-            <if test="ip != null">
-                ip,
-            </if>
-            <if test="port != null">
-                port,
-            </if>
-            <if test="extParams != null">
-                ext_params,
-            </if>
-            <if test="status != null">
-                status,
-            </if>
-            <if test="isDeleted != null">
-                is_deleted,
-            </if>
-            <if test="creator != null">
-                creator,
-            </if>
-            <if test="modifier != null">
-                modifier,
-            </if>
-            <if test="createTime != null">
-                create_time,
-            </if>
-            <if test="modifyTime != null">
-                modify_time,
-            </if>
-        </trim>
-        <trim prefix="values (" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                #{id,jdbcType=INTEGER},
-            </if>
-            <if test="parentId != null">
-                #{parentId,jdbcType=INTEGER},
-            </if>
-            <if test="type != null">
-                #{type,jdbcType=VARCHAR},
-            </if>
-            <if test="ip != null">
-                #{ip,jdbcType=VARCHAR},
-            </if>
-            <if test="port != null">
-                #{port,jdbcType=INTEGER},
-            </if>
-            <if test="extParams != null">
-                #{extParams,jdbcType=LONGVARCHAR},
-            </if>
-            <if test="status != null">
-                #{status,jdbcType=INTEGER},
-            </if>
-            <if test="isDeleted != null">
-                #{isDeleted,jdbcType=INTEGER},
-            </if>
-            <if test="creator != null">
-                #{creator,jdbcType=VARCHAR},
-            </if>
-            <if test="modifier != null">
-                #{modifier,jdbcType=VARCHAR},
-            </if>
-            <if test="createTime != null">
-                #{createTime,jdbcType=TIMESTAMP},
-            </if>
-            <if test="modifyTime != null">
-                #{modifyTime,jdbcType=TIMESTAMP},
-            </if>
-        </trim>
-    </insert>
 
     <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
         select
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterTagEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterTagEntityMapper.xml
new file mode 100644
index 000000000..01df6f986
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterTagEntityMapper.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper">
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="cluster_tag" jdbcType="VARCHAR" property="clusterTag"/>
+        <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
+        <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
+        <result column="status" jdbcType="INTEGER" property="status"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id, cluster_tag, ext_params, in_charges, status, is_deleted, creator, modifier, create_time, modify_time
+    </sql>
+
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        insert into inlong_cluster_tag (id, cluster_tag, ext_params,
+                                        in_charges, status, is_deleted,
+                                        creator, modifier,
+                                        create_time, modify_time)
+        values (#{id,jdbcType=INTEGER}, #{clusterTag,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+                #{inCharges,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
+                #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
+                #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
+    </insert>
+
+    <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_cluster_tag
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByTag" resultType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_cluster_tag
+        where cluster_tag = #{clusterTag, jdbcType=VARCHAR}
+        and is_deleted = 0
+    </select>
+    <select id="selectByCondition"
+            parameterType="org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest"
+            resultType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_cluster_tag
+        <where>
+            is_deleted = 0
+            <if test="keyword != null and keyword != ''">
+                and cluster_tag like CONCAT('%', #{keyword}, '%')
+            </if>
+        </where>
+        order by modify_time desc
+    </select>
+
+    <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        update inlong_cluster_tag
+        set cluster_tag = #{clusterTag,jdbcType=VARCHAR},
+            ext_params  = #{extParams,jdbcType=LONGVARCHAR},
+            in_charges  = #{inCharges,jdbcType=VARCHAR},
+            status      = #{status,jdbcType=INTEGER},
+            is_deleted  = #{isDeleted,jdbcType=INTEGER},
+            creator     = #{creator,jdbcType=VARCHAR},
+            modifier    = #{modifier,jdbcType=VARCHAR},
+            create_time = #{createTime,jdbcType=TIMESTAMP},
+            modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+    <update id="updateByPrimaryKeySelective"
+            parameterType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+        update inlong_cluster_tag
+        <set>
+            <if test="clusterTag != null">
+                cluster_tag = #{clusterTag,jdbcType=VARCHAR},
+            </if>
+            <if test="extParams != null">
+                ext_params = #{extParams,jdbcType=LONGVARCHAR},
+            </if>
+            <if test="inCharges != null">
+                in_charges = #{inCharges,jdbcType=VARCHAR},
+            </if>
+            <if test="status != null">
+                status = #{status,jdbcType=INTEGER},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+            <if test="creator != null">
+                creator = #{creator,jdbcType=VARCHAR},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier,jdbcType=VARCHAR},
+            </if>
+            <if test="createTime != null">
+                create_time = #{createTime,jdbcType=TIMESTAMP},
+            </if>
+            <if test="modifyTime != null">
+                modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+            </if>
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+
+    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+        delete
+        from inlong_cluster_tag
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
index 807988ff3..1aaaa0299 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
@@ -44,7 +44,10 @@ public abstract class AbstractClusterOperator implements InlongClusterOperator {
         this.setTargetEntity(request, entity);
 
         entity.setCreator(operator);
-        entity.setCreateTime(new Date());
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
         entity.setIsDeleted(InlongConstants.UN_DELETED);
         clusterMapper.insert(entity);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index ad851dbf4..80d4d4ca3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -21,6 +21,9 @@ import com.github.pagehelper.PageInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
@@ -33,6 +36,49 @@ import java.util.List;
  */
 public interface InlongClusterService {
 
+    /**
+     * Save cluster tag.
+     *
+     * @param request cluster tag
+     * @param operator name of operator
+     * @return cluster tag id after saving
+     */
+    Integer saveTag(ClusterTagRequest request, String operator);
+
+    /**
+     * Get cluster tag by id.
+     *
+     * @param id cluster tag id
+     * @return cluster tag info
+     */
+    ClusterTagResponse getTag(Integer id);
+
+    /**
+     * Paging query cluster tags according to conditions.
+     *
+     * @param request page request conditions
+     * @return cluster tag list
+     */
+    PageInfo<ClusterTagResponse> listTag(ClusterTagPageRequest request);
+
+    /**
+     * Update cluster tag.
+     *
+     * @param request cluster tag to be modified
+     * @param operator current operator
+     * @return whether succeed
+     */
+    Boolean updateTag(ClusterTagRequest request, String operator);
+
+    /**
+     * Delete cluster tag.
+     *
+     * @param id cluster tag id to be deleted
+     * @param operator current operator
+     * @return whether succeed
+     */
+    Boolean deleteTag(Integer id, String operator);
+
     /**
      * Save cluster info.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 47ba191c0..17a49dca0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,6 +36,9 @@ import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagResponse;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
@@ -48,8 +51,10 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
 import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
@@ -83,19 +88,118 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     @Autowired
     private InlongClusterOperatorFactory clusterOperatorFactory;
     @Autowired
+    private InlongClusterTagEntityMapper clusterTagMapper;
+    @Autowired
     private InlongClusterEntityMapper clusterMapper;
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
     @Autowired
     private DataProxyConfigRepository proxyRepository;
 
+    @Override
+    public Integer saveTag(ClusterTagRequest request, String operator) {
+        LOGGER.debug("begin to save cluster tag {}", request);
+        Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
+        Preconditions.checkNotNull(request.getClusterTag(), "cluster tag cannot be empty");
+
+        // check if the cluster tag already exist
+        String clusterTag = request.getClusterTag();
+        InlongClusterTagEntity exist = clusterTagMapper.selectByTag(clusterTag);
+        if (exist != null) {
+            String errMsg = String.format("inlong cluster tag already exist for tag=%s", clusterTag);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        InlongClusterTagEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
+        clusterTagMapper.insert(entity);
+        LOGGER.info("success to save cluster tag={} by user={}", request, operator);
+        return entity.getId();
+    }
+
+    @Override
+    public ClusterTagResponse getTag(Integer id) {
+        Preconditions.checkNotNull(id, "inlong cluster tag id cannot be empty");
+        InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
+        if (entity == null) {
+            LOGGER.error("inlong cluster tag not found by id={}", id);
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+        }
+
+        ClusterTagResponse response = CommonBeanUtils.copyProperties(entity, ClusterTagResponse::new);
+        LOGGER.debug("success to get cluster tag info by id={}", id);
+        return response;
+    }
+
+    @Override
+    public PageInfo<ClusterTagResponse> listTag(ClusterTagPageRequest request) {
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+        Page<InlongClusterTagEntity> entityPage = (Page<InlongClusterTagEntity>) clusterTagMapper
+                .selectByCondition(request);
+        List<ClusterTagResponse> tagList = CommonBeanUtils.copyListProperties(entityPage, ClusterTagResponse::new);
+        PageInfo<ClusterTagResponse> page = new PageInfo<>(tagList);
+        page.setTotal(tagList.size());
+
+        LOGGER.debug("success to list cluster tag by {}", request);
+        return page;
+    }
+
+    @Override
+    public Boolean updateTag(ClusterTagRequest request, String operator) {
+        LOGGER.debug("begin to update cluster tag={}", request);
+        Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
+        Preconditions.checkNotNull(request.getClusterTag(), "inlong cluster tag cannot be empty");
+
+        Integer id = request.getId();
+        Preconditions.checkNotNull(id, "cluster tag id cannot be empty");
+        // check cluster tag if exist
+        InlongClusterTagEntity exist = clusterTagMapper.selectByTag(request.getClusterTag());
+        if (exist != null && !Objects.equals(id, exist.getId())) {
+            String errMsg = String.format("inlong cluster tag already exist for tag=%s", request.getClusterTag());
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
+        if (entity == null) {
+            LOGGER.error("cluster tag not found by id={}", id);
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+        }
+        CommonBeanUtils.copyProperties(request, entity, true);
+        entity.setModifier(operator);
+        entity.setModifyTime(new Date());
+        clusterTagMapper.updateById(entity);
+        LOGGER.info("success to update cluster tag={}", request);
+        return true;
+    }
+
+    @Override
+    public Boolean deleteTag(Integer id, String operator) {
+        Preconditions.checkNotNull(id, "cluster tag id cannot be empty");
+        InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
+        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
+            LOGGER.error("inlong cluster tag not found by id={}", id);
+            return false;
+        }
+        entity.setIsDeleted(entity.getId());
+        entity.setModifier(operator);
+        clusterTagMapper.updateById(entity);
+        LOGGER.info("success to delete cluster tag by id={}", id);
+        return true;
+    }
+
     @Override
     public Integer save(InlongClusterRequest request, String operator) {
         LOGGER.debug("begin to save inlong cluster={}", request);
-        Preconditions.checkNotNull(request, "inlong cluster info cannot be empty");
+        Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
 
         // check if the cluster already exist
-        String clusterTag = request.getClusterTag();
+        String clusterTag = request.getClusterTags();
         String name = request.getName();
         String type = request.getType();
         List<InlongClusterEntity> exist = clusterMapper.selectByKey(clusterTag, name, type);
@@ -166,7 +270,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         Preconditions.checkNotNull(id, "inlong cluster id cannot be empty");
 
         // check whether the cluster already exists
-        String clusterTag = request.getClusterTag();
+        String clusterTag = request.getClusterTags();
         String name = request.getName();
         String type = request.getType();
         List<InlongClusterEntity> exist = clusterMapper.selectByKey(clusterTag, name, type);
@@ -221,7 +325,10 @@ public class InlongClusterServiceImpl implements InlongClusterService {
 
         InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
         entity.setCreator(operator);
-        entity.setCreateTime(new Date());
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
         entity.setIsDeleted(InlongConstants.UN_DELETED);
         clusterNodeMapper.insert(entity);
 
@@ -283,7 +390,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         // check cluster node if exist
         InlongClusterNodeEntity exist = clusterNodeMapper.selectByUniqueKey(request);
         if (exist != null && !Objects.equals(id, exist.getId())) {
-            String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s)",
+            String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s",
                     request.getType(), request.getIp(), request.getPort());
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
@@ -366,7 +473,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
 
         // get all inlong groups which was successful and belongs to this data proxy cluster
         List<String> clusterTagList = clusterList.stream()
-                .map(InlongClusterEntity::getClusterTag)
+                .map(InlongClusterEntity::getClusterTags)
                 .collect(Collectors.toList());
         InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder()
                 .status(GroupStatus.CONFIG_SUCCESSFUL.getCode())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
index fcb834fd4..b394732f3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.core.impl;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.enums.DataNodeType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.DataNodeType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.common.pojo.node.DataNodeRequest;
@@ -71,7 +71,10 @@ public class DataNodeServiceImpl implements DataNodeService {
         }
         DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
         entity.setCreator(operator);
-        entity.setCreateTime(new Date());
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
         entity.setIsDeleted(InlongConstants.UN_DELETED);
         dataNodeMapper.insert(entity);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 530f49cd5..e9088d055 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -140,7 +140,7 @@ public class DataProxyConfigRepository implements IRepository {
             Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
             String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
             if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
-                cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTag(), k -> new HashMap<>())
+                cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTags(), k -> new HashMap<>())
                         .computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
             }
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4d396b614..42153c93c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -104,7 +104,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         List<SinkField> fields = request.getSinkFieldList();
         // Remove id in sinkField when save
         if (CollectionUtils.isNotEmpty(fields)) {
-            fields.stream().forEach(sinkField -> sinkField.setId(null));
+            fields.forEach(sinkField -> sinkField.setId(null));
         }
         int id = operation.saveOpt(request, operator);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 62ccb3295..94220b023 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -88,7 +88,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         // Remove id in sourceField when save
         List<StreamField> streamFields = request.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFields)) {
-            streamFields.stream().forEach(streamField -> streamField.setId(null));
+            streamFields.forEach(streamField -> streamField.setId(null));
         }
         int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 0b5216654..73fc43398 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -47,7 +47,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
      */
     public Integer saveDataProxyCluster(String clusterTag, String clusterName, String extTag) {
         DataProxyClusterRequest request = new DataProxyClusterRequest();
-        request.setClusterTag(clusterTag);
+        request.setClusterTags(clusterTag);
         request.setName(clusterName);
         request.setType(ClusterType.CLS_DATA_PROXY);
         request.setExtTag(extTag);
@@ -60,7 +60,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
      */
     public Integer savePulsarCluster(String clusterTag, String clusterName, String adminUrl) {
         PulsarClusterRequest request = new PulsarClusterRequest();
-        request.setClusterTag(clusterTag);
+        request.setClusterTags(clusterTag);
         request.setName(clusterName);
         request.setType(ClusterType.CLS_PULSAR);
         request.setAdminUrl(adminUrl);
@@ -86,7 +86,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         PulsarClusterRequest request = new PulsarClusterRequest();
         request.setId(id);
         request.setName(name);
-        request.setClusterTag(clusterTag);
+        request.setClusterTags(clusterTag);
         request.setAdminUrl(adminUrl);
         request.setInCharges(GLOBAL_OPERATOR);
         return clusterService.update(request, GLOBAL_OPERATOR);
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 4db31bd8d..23491c6bc 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -33,13 +33,13 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `daily_storage`          int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
     `peak_records`           int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
     `max_length`             int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
-    `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `followers`              varchar(512)      DEFAULT NULL COMMENT 'Name of followers, separated by commas',
     `enable_zookeeper`       tinyint(2)        DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(2)        DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
     `lightweight`            tinyint(2)        DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
     `inlong_cluster_tag`     varchar(128)      DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
-    `ext_params`             text              DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num,',
+    `ext_params`             text              DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
+    `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `followers`              varchar(512)      DEFAULT NULL COMMENT 'Name of followers, separated by commas',
     `status`                 int(4)            DEFAULT '100' COMMENT 'Inlong group status',
     `previous_status`        int(4)            DEFAULT '100' COMMENT 'Previous group status',
     `is_deleted`             int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -49,7 +49,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `modify_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
-    INDEX group_status_deleted_idx (`status`, `is_deleted`)
+    INDEX group_status_deleted_index (`status`, `is_deleted`)
 );
 
 -- ----------------------------
@@ -64,33 +64,52 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
     `is_deleted`      int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    KEY `index_group_id` (`inlong_group_id`),
+    KEY `group_id_index` (`inlong_group_id`),
     UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
 );
 
 -- ----------------------------
--- Table structure for inlong_cluster
+-- Table structure for inlong_cluster_tag
 -- ----------------------------
-CREATE TABLE IF NOT EXISTS `inlong_cluster`
+CREATE TABLE IF NOT EXISTS `inlong_cluster_tag`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `name`        varchar(128) NOT NULL COMMENT 'Cluster name',
-    `type`        varchar(20)       DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
-    `url`         varchar(512)      DEFAULT NULL COMMENT 'Cluster URL',
-    `cluster_tag` varchar(128)      DEFAULT NULL COMMENT 'Cluster tag, the same tab indicates that cluster belongs to the same set',
-    `ext_tag`     varchar(128)      DEFAULT NULL COMMENT 'Extension tag, for extended use',
-    `token`       varchar(512)      DEFAULT NULL COMMENT 'Cluster token',
-    `ext_params`  text              DEFAULT NULL COMMENT 'Extended params, will saved as JSON string',
-    `heartbeat`   text              DEFAULT NULL COMMENT 'Cluster heartbeat info',
+    `cluster_tag` varchar(128) NOT NULL COMMENT 'Cluster tag',
+    `ext_params`  text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
     `in_charges`  varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`      int(4)            DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `status`      int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_inlong_cluster_tag` (`cluster_tag`, `is_deleted`)
+);
+
+-- ----------------------------
+-- Table structure for inlong_cluster
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_cluster`
+(
+    `id`           int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `name`         varchar(128) NOT NULL COMMENT 'Cluster name',
+    `type`         varchar(20)           DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
+    `url`          varchar(512)          DEFAULT NULL COMMENT 'Cluster URL',
+    `cluster_tags` varchar(512)          DEFAULT NULL COMMENT 'Cluster tag, separated by commas',
+    `ext_tag`      varchar(128)          DEFAULT NULL COMMENT 'Extension tag, for extended use',
+    `token`        varchar(512)          DEFAULT NULL COMMENT 'Cluster token',
+    `ext_params`   text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `heartbeat`    text                  DEFAULT NULL COMMENT 'Cluster heartbeat info',
+    `in_charges`   varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `status`       int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`   int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`      varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`     varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cluster_index` (`name`, `type`, `cluster_tag`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
 );
 
 -- ----------------------------
@@ -100,18 +119,18 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `parent_id`   int(11)      NOT NULL COMMENT 'Id of the parent cluster',
-    `type`        varchar(20)       DEFAULT '' COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
+    `type`        varchar(20)  NOT NULL COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
     `ip`          varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
     `port`        int(6)       NULL COMMENT 'Cluster port',
-    `ext_params`  text              DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
-    `status`      int(4)            DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `ext_params`  text                  DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+    `status`      int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
 );
 
 -- ----------------------------
@@ -121,20 +140,20 @@ CREATE TABLE IF NOT EXISTS `data_node`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `name`        varchar(128) NOT NULL COMMENT 'Node name',
-    `type`        varchar(20)       DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
-    `url`         varchar(512)      DEFAULT NULL COMMENT 'Node URL',
-    `username`    varchar(128)      DEFAULT NULL COMMENT 'Username for node if needed',
-    `token`       varchar(512)      DEFAULT NULL COMMENT 'Node token',
-    `ext_params`  text              DEFAULT NULL COMMENT 'Extended params, will saved as JSON string',
+    `type`        varchar(20)           DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
+    `url`         varchar(512)          DEFAULT NULL COMMENT 'Node URL',
+    `username`    varchar(128)          DEFAULT NULL COMMENT 'Username for node if needed',
+    `token`       varchar(512)          DEFAULT NULL COMMENT 'Node token',
+    `ext_params`  text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
     `in_charges`  varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`      int(4)            DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `status`      int(4)                DEFAULT '0' COMMENT 'Node status',
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_data_node_index` (`name`, `type`, `is_deleted`)
+    UNIQUE KEY `unique_data_node` (`name`, `type`, `is_deleted`)
 );
 
 -- ----------------------------
@@ -189,7 +208,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
     `create_time`         timestamp   NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`         timestamp   NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `result_info`         varchar(64)      DEFAULT NULL,
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
 );
 
 -- ----------------------------
@@ -200,27 +220,27 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Owning inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id, non-deleted globally unique',
-    `name`             varchar(64)       DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
-    `description`      varchar(256)      DEFAULT '' COMMENT 'Introduction to inlong stream',
-    `mq_resource`      varchar(128)      DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
-    `data_type`        varchar(20)       DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
-    `data_encoding`    varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
-    `data_separator`   varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
-    `data_escape_char` varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
-    `sync_send`        tinyint(1)        DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
-    `daily_records`    int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
-    `daily_storage`    int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
-    `peak_records`     int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
-    `max_length`       int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
-    `storage_period`   int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
-    `ext_params`       text              DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
-    `status`           int(4)            DEFAULT '100' COMMENT 'Inlong stream status',
-    `previous_status`  int(4)            DEFAULT '100' COMMENT 'Previous status',
-    `is_deleted`       int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`          varchar(64)       DEFAULT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `name`             varchar(64)           DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
+    `description`      varchar(256)          DEFAULT '' COMMENT 'Introduction to inlong stream',
+    `mq_resource`      varchar(128)          DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
+    `data_type`        varchar(20)           DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+    `data_encoding`    varchar(8)            DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
+    `data_separator`   varchar(8)            DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char` varchar(8)            DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
+    `sync_send`        tinyint(1)            DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
+    `daily_records`    int(11)               DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`    int(11)               DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`     int(11)               DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`       int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+    `storage_period`   int(11)               DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
+    `ext_params`       text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `status`           int(4)                DEFAULT '100' COMMENT 'Inlong stream status',
+    `previous_status`  int(4)                DEFAULT '100' COMMENT 'Previous status',
+    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`          varchar(64)           DEFAULT NULL COMMENT 'Creator name',
+    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
 );
@@ -238,8 +258,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
     `is_deleted`       int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
-    KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+    UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+    KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -262,7 +282,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `index_field_stream_id` (`inlong_stream_id`)
+    KEY `field_stream_id_index` (`inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -302,8 +322,8 @@ CREATE TABLE IF NOT EXISTS `role`
     `update_by`   varchar(256) NOT NULL,
     `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_role_code_idx` (`role_code`),
-    UNIQUE KEY `unique_role_name_idx` (`role_name`)
+    UNIQUE KEY `unique_role_code` (`role_code`),
+    UNIQUE KEY `unique_role_name` (`role_name`)
 );
 
 -- ----------------------------
@@ -366,8 +386,8 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'ID',
     `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group id',
     `inlong_stream_id`   varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `source_type`        varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
     `source_name`        varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
+    `source_type`        varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
     `agent_ip`           varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
     `uuid`               varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`     varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
@@ -382,12 +402,12 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `is_deleted`         int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`            varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`           varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`        timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`        timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
-    KEY `source_status_idx` (`status`, `is_deleted`),
-    KEY `source_agent_ip_idx` (`agent_ip`, `is_deleted`)
+    KEY `source_status_index` (`status`, `is_deleted`),
+    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
 );
 
 -- ----------------------------
@@ -429,15 +449,15 @@ CREATE TABLE IF NOT EXISTS `stream_sink`
     `data_node_name`         varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `sort_task_name`         varchar(512)          DEFAULT NULL COMMENT 'Sort task name or task ID',
     `sort_consumer_group`    varchar(512)          DEFAULT NULL COMMENT 'Consumer group name for Sort task',
-    `ext_params`             text COMMENT 'Another fields, will be saved as JSON type',
-    `operate_log`            varchar(5000)         DEFAULT NULL COMMENT 'Background operate log',
+    `ext_params`             text         NULL COMMENT 'Another fields, will be saved as JSON type',
+    `operate_log`            text                  DEFAULT NULL COMMENT 'Background operate log',
     `status`                 int(11)               DEFAULT '100' COMMENT 'Status',
     `previous_status`        int(11)               DEFAULT '100' COMMENT 'Previous status',
     `is_deleted`             int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`                varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`               varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`            timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`            timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
 );
@@ -455,7 +475,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_ext`
     `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    KEY `index_sink_id` (`sink_id`)
+    KEY `sink_id_index` (`sink_id`)
 );
 
 -- ----------------------------
@@ -479,9 +499,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `index_source_id` (`source_id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
+    KEY `source_id_index` (`source_id`)
+);
 
 -- ----------------------------
 -- Table structure for stream_transform_field
@@ -503,12 +522,12 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
     `field_format`      varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `origin_node_name`  varchar(256)  DEFAULT '' COMMENT 'Origin Node name which stream field belongs',
+    `origin_node_name`  varchar(256)  DEFAULT '' COMMENT 'Origin node name which stream field belongs',
+    -- The source node name of the transport field
     `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name before transform operation',
     PRIMARY KEY (`id`),
-    KEY `index_transform_id` (`transform_id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
+    KEY `transform_id_index` (`transform_id`)
+);
 
 -- ----------------------------
 -- Table structure for stream_sink_field
@@ -549,7 +568,7 @@ CREATE TABLE IF NOT EXISTS `user`
     `create_by`    varchar(256) NOT NULL COMMENT 'create by sb.',
     `update_by`    varchar(256)          DEFAULT NULL COMMENT 'update by sb.',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_user_name_idx` (`name`)
+    UNIQUE KEY `unique_user_name` (`name`)
 );
 
 -- ----------------------------
@@ -558,13 +577,13 @@ CREATE TABLE IF NOT EXISTS `user`
 CREATE TABLE IF NOT EXISTS `user_role`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT,
-    `user_name`   varchar(256) NOT NULL COMMENT 'username',
-    `role_code`   varchar(256) NOT NULL COMMENT 'role code',
+    `user_name`   varchar(256) NOT NULL COMMENT 'Username',
+    `role_code`   varchar(256) NOT NULL COMMENT 'User role code',
     `create_time` datetime     NOT NULL,
     `update_time` datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP,
     `create_by`   varchar(256) NOT NULL,
     `update_by`   varchar(256) NOT NULL,
-    `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
+    `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled, 0-enabled, 1-disabled',
     PRIMARY KEY (`id`)
 );
 
@@ -621,7 +640,7 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
     `remark`               text COMMENT 'Execution result remark information',
     `exception`            text COMMENT 'Exception information',
     PRIMARY KEY (`id`),
-    INDEX group_status_idx (`inlong_group_id`, `status`)
+    INDEX group_status_index (`inlong_group_id`, `status`)
 );
 
 -- ----------------------------
@@ -667,7 +686,7 @@ CREATE TABLE IF NOT EXISTS `workflow_task`
     `end_time`             datetime      DEFAULT NULL COMMENT 'End time',
     `ext_params`           text COMMENT 'Extended information-json',
     PRIMARY KEY (`id`),
-    INDEX process_status_idx (`process_id`, `status`)
+    INDEX process_status_index (`process_id`, `status`)
 );
 
 -- ----------------------------
@@ -709,7 +728,7 @@ CREATE TABLE IF NOT EXISTS `sort_cluster_config`
     `task_name`    varchar(128) NOT NULL COMMENT 'Task name',
     `sink_type`    varchar(128) NOT NULL COMMENT 'Type of sink',
     PRIMARY KEY (`id`),
-    KEY `index_sort_cluster_config` (`cluster_name`)
+    KEY `sort_cluster_config_index` (`cluster_name`)
 );
 
 -- ----------------------------
@@ -724,7 +743,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_id_param`
     `param_key`   varchar(128)  NOT NULL COMMENT 'Key of param',
     `param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
     PRIMARY KEY (`id`),
-    KEY `index_sort_task_id_param` (`task_name`)
+    KEY `sort_task_id_param_index` (`task_name`)
 );
 
 -- ----------------------------
@@ -738,7 +757,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_sink_param`
     `param_key`   varchar(128)  NOT NULL COMMENT 'Key of param',
     `param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
     PRIMARY KEY (`id`),
-    KEY `index_sort_task_sink_params` (`task_name`, `sink_type`)
+    KEY `sort_task_sink_params_index` (`task_name`, `sink_type`)
 );
 
 -- ----------------------------
@@ -753,7 +772,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
     `topic`        varchar(128) DEFAULT '' COMMENT 'Topic',
     `ext_params`   text         DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
     PRIMARY KEY (`id`),
-    KEY `index_sort_source_config` (`cluster_name`, `task_name`)
+    KEY `sort_source_config_index` (`cluster_name`, `task_name`)
 );
 
 
@@ -785,7 +804,7 @@ CREATE TABLE IF NOT EXISTS `component_heartbeat`
     `instance`         varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
     `status_heartbeat` text        NOT NULL COMMENT 'Status heartbeat info',
     `metric_heartbeat` text        NOT NULL COMMENT 'Metric heartbeat info',
-    `report_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+    `report_time`      bigint(20)  NOT NULL COMMENT 'Report time',
     `create_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
@@ -803,7 +822,7 @@ CREATE TABLE IF NOT EXISTS `group_heartbeat`
     `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
     `status_heartbeat` text         NOT NULL COMMENT 'Status heartbeat info',
     `metric_heartbeat` text         NOT NULL COMMENT 'Metric heartbeat info',
-    `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+    `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
@@ -822,7 +841,7 @@ CREATE TABLE IF NOT EXISTS `stream_heartbeat`
     `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
     `status_heartbeat` text         NOT NULL COMMENT 'Status heartbeat info',
     `metric_heartbeat` text         NOT NULL COMMENT 'Metric heartbeat info',
-    `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+    `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 0a4a2727a..d5a94d94f 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -55,7 +55,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `modify_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
-    INDEX group_status_deleted_idx (`status`, `is_deleted`)
+    INDEX group_status_deleted_index (`status`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group table';
 
@@ -71,34 +71,54 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
     `is_deleted`      int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    KEY `index_group_id` (`inlong_group_id`),
-    UNIQUE KEY `unique_group_key` (`inlong_group_id`, `key_name`)
+    KEY `group_id_index` (`inlong_group_id`),
+    UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group extension table';
 
 -- ----------------------------
--- Table structure for inlong_cluster
+-- Table structure for inlong_cluster_tag
 -- ----------------------------
-CREATE TABLE IF NOT EXISTS `inlong_cluster`
+CREATE TABLE IF NOT EXISTS `inlong_cluster_tag`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `name`        varchar(128) NOT NULL COMMENT 'Cluster name',
-    `type`        varchar(20)       DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
-    `url`         varchar(512)      DEFAULT NULL COMMENT 'Cluster URL',
-    `cluster_tag` varchar(128)      DEFAULT NULL COMMENT 'Cluster tag, the same tab indicates that cluster belongs to the same set',
-    `ext_tag`     varchar(128)      DEFAULT NULL COMMENT 'Extension tag, for extended use',
-    `token`       varchar(512)      DEFAULT NULL COMMENT 'Cluster token',
-    `ext_params`  text              DEFAULT NULL COMMENT 'Extended params, will saved as JSON string',
-    `heartbeat`   text              DEFAULT NULL COMMENT 'Cluster heartbeat info',
+    `cluster_tag` varchar(128) NOT NULL COMMENT 'Cluster tag',
+    `ext_params`  text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
     `in_charges`  varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`      int(4)            DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `status`      int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_inlong_cluster_tag` (`cluster_tag`, `is_deleted`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster tag table';
+
+-- ----------------------------
+-- Table structure for inlong_cluster
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_cluster`
+(
+    `id`           int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `name`         varchar(128) NOT NULL COMMENT 'Cluster name',
+    `type`         varchar(20)           DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
+    `url`          varchar(512)          DEFAULT NULL COMMENT 'Cluster URL',
+    `cluster_tags` varchar(512)          DEFAULT NULL COMMENT 'Cluster tag, separated by commas',
+    `ext_tag`      varchar(128)          DEFAULT NULL COMMENT 'Extension tag, for extended use',
+    `token`        varchar(512)          DEFAULT NULL COMMENT 'Cluster token',
+    `ext_params`   text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `heartbeat`    text                  DEFAULT NULL COMMENT 'Cluster heartbeat info',
+    `in_charges`   varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `status`       int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`   int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`      varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`     varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cluster_index` (`name`, `type`, `cluster_tag`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster table';
 
@@ -109,18 +129,18 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `parent_id`   int(11)      NOT NULL COMMENT 'Id of the parent cluster',
-    `type`        varchar(20)       DEFAULT '' COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
-    `ip`          varchar(512) NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
+    `type`        varchar(20)  NOT NULL COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
+    `ip`          varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
     `port`        int(6)       NULL COMMENT 'Cluster port',
-    `ext_params`  text              DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
-    `status`      int(4)            DEFAULT '0' COMMENT 'Cluster status',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `ext_params`  text                  DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+    `status`      int(4)                DEFAULT '0' COMMENT 'Cluster status',
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+    UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table';
 
@@ -131,20 +151,20 @@ CREATE TABLE IF NOT EXISTS `data_node`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `name`        varchar(128) NOT NULL COMMENT 'Node name',
-    `type`        varchar(20)       DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
-    `url`         varchar(512)      DEFAULT NULL COMMENT 'Node URL',
-    `username`    varchar(128)      DEFAULT NULL COMMENT 'Username for node',
-    `token`       varchar(512)      DEFAULT NULL COMMENT 'Node token',
-    `ext_params`  text              DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `type`        varchar(20)           DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
+    `url`         varchar(512)          DEFAULT NULL COMMENT 'Node URL',
+    `username`    varchar(128)          DEFAULT NULL COMMENT 'Username for node if needed',
+    `token`       varchar(512)          DEFAULT NULL COMMENT 'Node token',
+    `ext_params`  text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
     `in_charges`  varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`      int(4)            DEFAULT '0' COMMENT 'Node status',
-    `is_deleted`  int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `status`      int(4)                DEFAULT '0' COMMENT 'Node status',
+    `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_data_node_index` (`name`, `type`, `is_deleted`)
+    UNIQUE KEY `unique_data_node` (`name`, `type`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table';
 
@@ -157,7 +177,7 @@ CREATE TABLE IF NOT EXISTS `consumption`
     `consumer_group`   varchar(256) NOT NULL COMMENT 'Consumer group',
     `in_charges`       varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
     `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `mq_type`          varchar(10)       DEFAULT 'TUBE' COMMENT 'Message queue, high throughput: TUBE, high consistency: PULSAR',
+    `mq_type`          varchar(10)       DEFAULT 'TUBE' COMMENT 'Message queue type, high throughput: TUBE, high consistency: PULSAR',
     `topic`            varchar(256) NOT NULL COMMENT 'Consumption topic',
     `filter_enabled`   int(2)            DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
     `inlong_stream_id` varchar(256)      DEFAULT NULL COMMENT 'Inlong stream ID for consumption, if filter_enable is 1, it cannot empty',
@@ -203,7 +223,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
     `modify_time`         timestamp   NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `result_info`         varchar(64)      DEFAULT NULL,
     PRIMARY KEY (`id`),
-    KEY `index_1` (`task_id`, `bSend`, `specified_data_time`)
+    KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8;
 
@@ -215,27 +235,27 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Owning inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id, non-deleted globally unique',
-    `name`             varchar(64)       DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
-    `description`      varchar(256)      DEFAULT '' COMMENT 'Introduction to inlong stream',
-    `mq_resource`      varchar(128)      DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
-    `data_type`        varchar(20)       DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
-    `data_encoding`    varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
-    `data_separator`   varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
-    `data_escape_char` varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
-    `sync_send`        tinyint(1)        DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
-    `daily_records`    int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
-    `daily_storage`    int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
-    `peak_records`     int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
-    `max_length`       int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
-    `storage_period`   int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
-    `ext_params`       text              DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
-    `status`           int(4)            DEFAULT '100' COMMENT 'Inlong stream status',
-    `previous_status`  int(4)            DEFAULT '100' COMMENT 'Previous status',
-    `is_deleted`       int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`          varchar(64)       DEFAULT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `name`             varchar(64)           DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
+    `description`      varchar(256)          DEFAULT '' COMMENT 'Introduction to inlong stream',
+    `mq_resource`      varchar(128)          DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
+    `data_type`        varchar(20)           DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+    `data_encoding`    varchar(8)            DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
+    `data_separator`   varchar(8)            DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char` varchar(8)            DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
+    `sync_send`        tinyint(1)            DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
+    `daily_records`    int(11)               DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`    int(11)               DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`     int(11)               DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`       int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+    `storage_period`   int(11)               DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
+    `ext_params`       text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `status`           int(4)                DEFAULT '100' COMMENT 'Inlong stream status',
+    `previous_status`  int(4)                DEFAULT '100' COMMENT 'Previous status',
+    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`          varchar(64)           DEFAULT NULL COMMENT 'Creator name',
+    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
 ) ENGINE = InnoDB
@@ -254,8 +274,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
     `is_deleted`       int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
-    KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+    UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+    KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
 
@@ -279,7 +299,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
     `rank_num`            smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`          int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `index_field_stream_id` (`inlong_stream_id`)
+    KEY `field_stream_id_index` (`inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='File/DB data source field table';
 
@@ -321,8 +341,8 @@ CREATE TABLE IF NOT EXISTS `role`
     `update_by`   varchar(256) NOT NULL,
     `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_role_code_idx` (`role_code`),
-    UNIQUE KEY `unique_role_name_idx` (`role_name`)
+    UNIQUE KEY `unique_role_code` (`role_code`),
+    UNIQUE KEY `unique_role_name` (`role_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Role Table';
 
@@ -404,12 +424,12 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `is_deleted`         int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`            varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`           varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`        timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`        timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
-    KEY `source_status_idx` (`status`, `is_deleted`),
-    KEY `source_agent_ip_idx` (`agent_ip`, `is_deleted`)
+    KEY `source_status_index` (`status`, `is_deleted`),
+    KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
 
@@ -433,7 +453,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform`
     `create_time`          timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`          timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`) USING BTREE
+    UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform table';
 
@@ -460,8 +480,8 @@ CREATE TABLE IF NOT EXISTS `stream_sink`
     `is_deleted`             int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `creator`                varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`               varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`            timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`            timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
 ) ENGINE = InnoDB
@@ -480,7 +500,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_ext`
     `is_deleted`  int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    KEY `index_sink_id` (`sink_id`)
+    KEY `sink_id_index` (`sink_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink extension table';
 
@@ -505,7 +525,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
     `rank_num`         smallint(6)  DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`       int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY `index_source_id` (`source_id`)
+    KEY `source_id_index` (`source_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
 
@@ -533,7 +553,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
     -- The source node name of the transport field
     `origin_field_name` varchar(50)   DEFAULT '' COMMENT 'Origin field name before transform operation',
     PRIMARY KEY (`id`),
-    KEY `index_transform_id` (`transform_id`)
+    KEY `transform_id_index` (`transform_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
 
@@ -577,7 +597,7 @@ CREATE TABLE IF NOT EXISTS `user`
     `create_by`    varchar(256) NOT NULL COMMENT 'create by sb.',
     `update_by`    varchar(256)          DEFAULT NULL COMMENT 'update by sb.',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_user_name_idx` (`name`)
+    UNIQUE KEY `unique_user_name` (`name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='User table';
 
@@ -592,13 +612,13 @@ VALUES ('admin', '628ed559bff5ae36bd2184d4216973cf', 0, '2099-12-31 23:59:59',
 CREATE TABLE IF NOT EXISTS `user_role`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT,
-    `user_name`   varchar(256) NOT NULL COMMENT 'username rtx',
-    `role_code`   varchar(256) NOT NULL COMMENT 'role',
+    `user_name`   varchar(256) NOT NULL COMMENT 'Username',
+    `role_code`   varchar(256) NOT NULL COMMENT 'User role code',
     `create_time` datetime     NOT NULL,
     `update_time` datetime     NOT NULL DEFAULT CURRENT_TIMESTAMP,
     `create_by`   varchar(256) NOT NULL,
     `update_by`   varchar(256) NOT NULL,
-    `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
+    `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled, 0-enabled, 1-disabled',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='User Role Table';
@@ -657,7 +677,7 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
     `remark`               text COMMENT 'Execution result remark information',
     `exception`            text COMMENT 'Exception information',
     PRIMARY KEY (`id`),
-    INDEX group_status_idx (`inlong_group_id`, `status`)
+    INDEX group_status_index (`inlong_group_id`, `status`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow event log table';
 
@@ -705,7 +725,7 @@ CREATE TABLE IF NOT EXISTS `workflow_task`
     `end_time`             datetime      DEFAULT NULL COMMENT 'End time',
     `ext_params`           text COMMENT 'Extended information-json',
     PRIMARY KEY (`id`),
-    INDEX process_status_idx (`process_id`, `status`)
+    INDEX process_status_index (`process_id`, `status`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow task table';
 
@@ -749,7 +769,7 @@ CREATE TABLE IF NOT EXISTS `sort_cluster_config`
     `task_name`    varchar(128) NOT NULL COMMENT 'Task name',
     `sink_type`    varchar(128) NOT NULL COMMENT 'Type of sink',
     PRIMARY KEY (`id`),
-    KEY `index_sort_cluster_config` (`cluster_name`)
+    KEY `sort_cluster_config_index` (`cluster_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort cluster config table';
 
@@ -765,7 +785,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_id_param`
     `param_key`   varchar(128)  NOT NULL COMMENT 'Key of param',
     `param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
     PRIMARY KEY (`id`),
-    KEY `index_sort_task_id_param` (`task_name`)
+    KEY `sort_task_id_param_index` (`task_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task id params table';
 
@@ -780,7 +800,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_sink_param`
     `param_key`   varchar(128)  NOT NULL COMMENT 'Key of param',
     `param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
     PRIMARY KEY (`id`),
-    KEY `index_sort_task_sink_params` (`task_name`, `sink_type`)
+    KEY `sort_task_sink_params_index` (`task_name`, `sink_type`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task sink params table';
 
@@ -796,7 +816,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
     `topic`        varchar(128) DEFAULT '' COMMENT 'Topic',
     `ext_params`   text         DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
     PRIMARY KEY (`id`),
-    KEY `index_sort_source_config` (`cluster_name`, `task_name`)
+    KEY `sort_source_config_index` (`cluster_name`, `task_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
 
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 23a908e6b..93d661dbc 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -25,15 +25,19 @@ import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagResponse;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
 import org.apache.inlong.manager.common.pojo.user.UserRoleCode;
 import org.apache.inlong.manager.common.util.LoginUserUtils;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.operationlog.OperationLog;
 import org.apache.shiro.authz.annotation.RequiresRoles;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -47,44 +51,83 @@ import org.springframework.web.bind.annotation.RestController;
  * Inlong cluster controller
  */
 @RestController
-@RequestMapping("/cluster")
 @Api(tags = "Inlong-Cluster-API")
 public class InlongClusterController {
 
     @Autowired
     private InlongClusterService clusterService;
 
-    @PostMapping(value = "/save")
+    @PostMapping(value = "/cluster/tag/save")
+    @ApiOperation(value = "Save cluster tag")
+    @OperationLog(operation = OperationType.CREATE)
+    @RequiresRoles(value = UserRoleCode.ADMIN)
+    public Response<Integer> saveTag(@Validated @RequestBody ClusterTagRequest request) {
+        String currentUser = LoginUserUtils.getLoginUserDetail().getUsername();
+        return Response.success(clusterService.saveTag(request, currentUser));
+    }
+
+    @GetMapping(value = "/cluster/tag/get/{id}")
+    @ApiOperation(value = "Get cluster tag by id")
+    @ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
+    public Response<ClusterTagResponse> getTag(@PathVariable Integer id) {
+        return Response.success(clusterService.getTag(id));
+    }
+
+    @PostMapping(value = "/cluster/tag/list")
+    @ApiOperation(value = "List cluster tags")
+    public Response<PageInfo<ClusterTagResponse>> listTag(@RequestBody ClusterTagPageRequest request) {
+        request.setCurrentUser(LoginUserUtils.getLoginUserDetail().getUsername());
+        return Response.success(clusterService.listTag(request));
+    }
+
+    @PostMapping(value = "/cluster/tag/update")
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Update cluster tag")
+    public Response<Boolean> updateTag(@Validated @RequestBody ClusterTagRequest request) {
+        String username = LoginUserUtils.getLoginUserDetail().getUsername();
+        return Response.success(clusterService.updateTag(request, username));
+    }
+
+    @DeleteMapping(value = "/cluster/tag/delete/{id}")
+    @ApiOperation(value = "Delete cluster tag by id")
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
+    @RequiresRoles(value = UserRoleCode.ADMIN)
+    public Response<Boolean> deleteTag(@PathVariable Integer id) {
+        return Response.success(clusterService.deleteTag(id, LoginUserUtils.getLoginUserDetail().getUsername()));
+    }
+
+    @PostMapping(value = "/cluster/save")
     @ApiOperation(value = "Save cluster")
     @OperationLog(operation = OperationType.CREATE)
     @RequiresRoles(value = UserRoleCode.ADMIN)
-    public Response<Integer> save(@RequestBody InlongClusterRequest request) {
+    public Response<Integer> save(@Validated @RequestBody InlongClusterRequest request) {
         String currentUser = LoginUserUtils.getLoginUserDetail().getUsername();
         return Response.success(clusterService.save(request, currentUser));
     }
 
-    @GetMapping(value = "/get/{id}")
+    @GetMapping(value = "/cluster/get/{id}")
     @ApiOperation(value = "Get cluster by id")
     @ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
     public Response<InlongClusterInfo> get(@PathVariable Integer id) {
         return Response.success(clusterService.get(id));
     }
 
-    @PostMapping(value = "/list")
+    @PostMapping(value = "/cluster/list")
     @ApiOperation(value = "List clusters")
     public Response<PageInfo<InlongClusterInfo>> list(@RequestBody InlongClusterPageRequest request) {
         return Response.success(clusterService.list(request));
     }
 
-    @PostMapping(value = "/update")
+    @PostMapping(value = "/cluster/update")
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update cluster")
-    public Response<Boolean> update(@RequestBody InlongClusterRequest request) {
+    public Response<Boolean> update(@Validated @RequestBody InlongClusterRequest request) {
         String username = LoginUserUtils.getLoginUserDetail().getUsername();
         return Response.success(clusterService.update(request, username));
     }
 
-    @DeleteMapping(value = "/delete/{id}")
+    @DeleteMapping(value = "/cluster/delete/{id}")
     @ApiOperation(value = "Delete cluster by id")
     @OperationLog(operation = OperationType.DELETE)
     @ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
@@ -93,36 +136,36 @@ public class InlongClusterController {
         return Response.success(clusterService.delete(id, LoginUserUtils.getLoginUserDetail().getUsername()));
     }
 
-    @PostMapping(value = "/node/save")
+    @PostMapping(value = "/cluster/node/save")
     @ApiOperation(value = "Save cluster node")
     @OperationLog(operation = OperationType.CREATE)
-    public Response<Integer> saveNode(@RequestBody ClusterNodeRequest request) {
+    public Response<Integer> saveNode(@Validated @RequestBody ClusterNodeRequest request) {
         String currentUser = LoginUserUtils.getLoginUserDetail().getUsername();
         return Response.success(clusterService.saveNode(request, currentUser));
     }
 
-    @GetMapping(value = "/node/get/{id}")
+    @GetMapping(value = "/cluster/node/get/{id}")
     @ApiOperation(value = "Get cluster node by id")
     @ApiImplicitParam(name = "id", value = "Cluster node ID", dataTypeClass = Integer.class, required = true)
     public Response<ClusterNodeResponse> getNode(@PathVariable Integer id) {
         return Response.success(clusterService.getNode(id));
     }
 
-    @PostMapping(value = "/node/list")
+    @PostMapping(value = "/cluster/node/list")
     @ApiOperation(value = "List cluster nodes")
     public Response<PageInfo<ClusterNodeResponse>> listNode(@RequestBody InlongClusterPageRequest request) {
         return Response.success(clusterService.listNode(request));
     }
 
-    @RequestMapping(value = "/node/update", method = RequestMethod.POST)
+    @RequestMapping(value = "/cluster/node/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update cluster node")
-    public Response<Boolean> updateNode(@RequestBody ClusterNodeRequest request) {
+    public Response<Boolean> updateNode(@Validated @RequestBody ClusterNodeRequest request) {
         String username = LoginUserUtils.getLoginUserDetail().getUsername();
         return Response.success(clusterService.updateNode(request, username));
     }
 
-    @RequestMapping(value = "/node/delete/{id}", method = RequestMethod.DELETE)
+    @RequestMapping(value = "/cluster/node/delete/{id}", method = RequestMethod.DELETE)
     @ApiOperation(value = "Delete cluster node")
     @OperationLog(operation = OperationType.DELETE)
     @ApiImplicitParam(name = "id", value = "Cluster node id", dataTypeClass = Integer.class, required = true)