You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/29 10:54:28 UTC
[inlong] branch master updated: [INLONG-4796][Manager] Support management of Inlong cluster tag (#4800)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3b3ae294b [INLONG-4796][Manager] Support management of Inlong cluster tag (#4800)
3b3ae294b is described below
commit 3b3ae294bdb0f6c859a243ec321a685e33de1344
Author: healzhou <he...@gmail.com>
AuthorDate: Wed Jun 29 18:54:22 2022 +0800
[INLONG-4796][Manager] Support management of Inlong cluster tag (#4800)
---
.../client/api/inner/InnerInlongManagerClient.java | 2 +-
.../api/inner/InnerInlongManagerClientTest.java | 4 +-
.../pojo/cluster/ClusterTagPageRequest.java} | 47 +++--
.../common/pojo/cluster/ClusterTagRequest.java} | 36 ++--
...ongClusterInfo.java => ClusterTagResponse.java} | 37 +---
.../common/pojo/cluster/InlongClusterInfo.java | 4 +-
.../common/pojo/cluster/InlongClusterRequest.java | 4 +-
.../common/pojo/dataproxy/CacheCluster.java | 12 +-
.../manager/dao/entity/InlongClusterEntity.java | 2 +-
...sterEntity.java => InlongClusterTagEntity.java} | 15 +-
.../dao/mapper/InlongClusterEntityMapper.java | 8 +-
.../dao/mapper/InlongClusterNodeEntityMapper.java | 2 -
...pper.java => InlongClusterTagEntityMapper.java} | 24 +--
.../main/resources/mappers/ClusterSetMapper.xml | 22 +--
.../mappers/InlongClusterEntityMapper.xml | 162 +++-------------
.../mappers/InlongClusterNodeEntityMapper.xml | 80 --------
.../mappers/InlongClusterTagEntityMapper.xml | 132 +++++++++++++
.../service/cluster/AbstractClusterOperator.java | 5 +-
.../service/cluster/InlongClusterService.java | 46 +++++
.../service/cluster/InlongClusterServiceImpl.java | 119 +++++++++++-
.../service/core/impl/DataNodeServiceImpl.java | 7 +-
.../repository/DataProxyConfigRepository.java | 2 +-
.../service/sink/StreamSinkServiceImpl.java | 2 +-
.../service/source/StreamSourceServiceImpl.java | 2 +-
.../service/cluster/InlongClusterServiceTest.java | 6 +-
.../main/resources/h2/apache_inlong_manager.sql | 213 +++++++++++----------
.../manager-web/sql/apache_inlong_manager.sql | 194 ++++++++++---------
.../web/controller/InlongClusterController.java | 75 ++++++--
28 files changed, 694 insertions(+), 570 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 2b711e25b..d3936b2aa 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -135,7 +135,7 @@ public class InnerInlongManagerClient {
public Integer saveCluster(InlongClusterRequest request) {
AssertUtils.notEmpty(request.getName(), "cluster name should not be empty");
AssertUtils.notEmpty(request.getType(), "cluster type should not be empty");
- AssertUtils.notEmpty(request.getClusterTag(), "cluster tag should not be empty");
+ AssertUtils.notEmpty(request.getClusterTags(), "cluster tags should not be empty");
Response<Integer> clusterIndexResponse = executeHttpCall(inlongClusterApi.save(request));
assertRespSuccess(clusterIndexResponse);
return clusterIndexResponse.getData();
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
index ba7d84bc1..89b04ca41 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
@@ -680,9 +680,9 @@ class InnerInlongManagerClientTest {
InlongClusterRequest request = new InlongClusterRequest();
request.setName("pulsar");
request.setType("PULSAR");
- request.setClusterTag("test_cluster");
+ request.setClusterTags("test_cluster");
Integer clusterIndex = innerInlongManagerClient.saveCluster(request);
- Assertions.assertTrue(clusterIndex == 1);
+ Assertions.assertEquals(1, (int) clusterIndex);
}
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagPageRequest.java
similarity index 52%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagPageRequest.java
index 27f237083..3535e6fe7 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagPageRequest.java
@@ -15,38 +15,35 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.cluster;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.beans.PageRequest;
/**
- * Inlong cluster entity, including name, type, cluster tag, etc.
+ * Cluster tag paging query conditions
*/
@Data
-public class InlongClusterEntity implements Serializable {
-
- private static final long serialVersionUID = 1L;
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Cluster tag paging query request")
+public class ClusterTagPageRequest extends PageRequest {
- private Integer id;
- private String name;
- private String type;
- private String url;
- private String clusterTag;
- private String extTag;
- private String token;
-
- private String extParams;
- private String heartbeat;
- private String inCharges;
+ @ApiModelProperty(value = "Keywords, used for fuzzy query")
+ private String keyword;
+ @ApiModelProperty(value = "Status")
private Integer status;
- private Integer isDeleted;
- private String creator;
- private String modifier;
- private Date createTime;
- private Date modifyTime;
-}
\ No newline at end of file
+ @ApiModelProperty(value = "Current user", hidden = true)
+ private String currentUser;
+
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagRequest.java
similarity index 61%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagRequest.java
index 27f237083..ef2ffd7d2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagRequest.java
@@ -15,38 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
+package org.apache.inlong.manager.common.pojo.cluster;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import java.io.Serializable;
-import java.util.Date;
+import javax.validation.constraints.NotNull;
/**
- * Inlong cluster entity, including name, type, cluster tag, etc.
+ * Inlong cluster tag request
*/
@Data
-public class InlongClusterEntity implements Serializable {
-
- private static final long serialVersionUID = 1L;
+@ApiModel("Cluster tag request")
+public class ClusterTagRequest {
+ @ApiModelProperty(value = "Primary key")
private Integer id;
- private String name;
- private String type;
- private String url;
+
+ @NotNull
+ @ApiModelProperty(value = "Cluster type")
private String clusterTag;
- private String extTag;
- private String token;
+ @ApiModelProperty(value = "Extended params")
private String extParams;
- private String heartbeat;
- private String inCharges;
- private Integer status;
- private Integer isDeleted;
- private String creator;
- private String modifier;
- private Date createTime;
- private Date modifyTime;
+ @ApiModelProperty(value = "Name of in charges, separated by commas")
+ private String inCharges;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagResponse.java
similarity index 61%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagResponse.java
index c34e50427..c85098997 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/ClusterTagResponse.java
@@ -18,50 +18,25 @@
package org.apache.inlong.manager.common.pojo.cluster;
import com.fasterxml.jackson.annotation.JsonFormat;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import java.util.Date;
/**
- * Inlong cluster info
+ * Inlong cluster tag response
*/
@Data
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Inlong cluster info")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
-public class InlongClusterInfo {
+@ApiModel("Cluster tag response")
+public class ClusterTagResponse {
@ApiModelProperty(value = "Primary key")
private Integer id;
- @ApiModelProperty(value = "Cluster name")
- private String name;
-
- @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, DATA_PROXY, etc.")
- private String type;
-
- @ApiModelProperty(value = "Cluster url")
- private String url;
-
- @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, DATA_PROXY, etc.")
+ @ApiModelProperty(value = "Cluster tag")
private String clusterTag;
- @ApiModelProperty(value = "Extension tag")
- private String extTag;
-
- @ApiModelProperty(value = "Cluster token")
- private String token;
-
- @ApiModelProperty(value = "Cluster heartbeat info")
- private String heartbeat;
-
@ApiModelProperty(value = "Extended params")
private String extParams;
@@ -83,8 +58,4 @@ public class InlongClusterInfo {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
- public InlongClusterRequest genRequest() {
- return CommonBeanUtils.copyProperties(this, InlongClusterRequest::new);
- }
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
index c34e50427..690d51a75 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterInfo.java
@@ -50,8 +50,8 @@ public class InlongClusterInfo {
@ApiModelProperty(value = "Cluster url")
private String url;
- @ApiModelProperty(value = "Cluster type, including TUBE, PULSAR, DATA_PROXY, etc.")
- private String clusterTag;
+ @ApiModelProperty(value = "Cluster tags, separated by commas")
+ private String clusterTags;
@ApiModelProperty(value = "Extension tag")
private String extTag;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java
index 70ce56ea1..9d699649c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/InlongClusterRequest.java
@@ -53,8 +53,8 @@ public class InlongClusterRequest {
private String url;
@NotBlank
- @ApiModelProperty(value = "Cluster tag")
- private String clusterTag;
+ @ApiModelProperty(value = "Cluster tags, separated by commas")
+ private String clusterTags;
@ApiModelProperty(value = "Extension tag")
private String extTag;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
index 8d71adc9b..1d130c32d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataproxy/CacheCluster.java
@@ -24,7 +24,7 @@ public class CacheCluster {
private String clusterName;
private String type;
- private String clusterTag;
+ private String clusterTags;
private String extTag;
private String extParams;
@@ -69,17 +69,17 @@ public class CacheCluster {
*
* @return the clusterTag
*/
- public String getClusterTag() {
- return clusterTag;
+ public String getClusterTags() {
+ return clusterTags;
}
/**
* set clusterTag
*
- * @param clusterTag the clusterTag to set
+ * @param clusterTags the clusterTag to set
*/
- public void setClusterTag(String clusterTag) {
- this.clusterTag = clusterTag;
+ public void setClusterTags(String clusterTags) {
+ this.clusterTags = clusterTags;
}
/**
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
index 27f237083..46b67f765 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
@@ -34,7 +34,7 @@ public class InlongClusterEntity implements Serializable {
private String name;
private String type;
private String url;
- private String clusterTag;
+ private String clusterTags;
private String extTag;
private String token;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterTagEntity.java
similarity index 81%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterTagEntity.java
index 27f237083..7bc2b379f 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterTagEntity.java
@@ -23,25 +23,16 @@ import java.io.Serializable;
import java.util.Date;
/**
- * Inlong cluster entity, including name, type, cluster tag, etc.
+ * Inlong cluster tag entity.
*/
@Data
-public class InlongClusterEntity implements Serializable {
+public class InlongClusterTagEntity implements Serializable {
private static final long serialVersionUID = 1L;
-
private Integer id;
- private String name;
- private String type;
- private String url;
private String clusterTag;
- private String extTag;
- private String token;
-
private String extParams;
- private String heartbeat;
private String inCharges;
-
private Integer status;
private Integer isDeleted;
private String creator;
@@ -49,4 +40,4 @@ public class InlongClusterEntity implements Serializable {
private Date createTime;
private Date modifyTime;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index c25e7ff60..b42103ebd 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -29,22 +29,20 @@ public interface InlongClusterEntityMapper {
int insert(InlongClusterEntity record);
- int insertSelective(InlongClusterEntity record);
-
InlongClusterEntity selectById(Integer id);
/**
- * Select clusters by tag, name and type, the tag and name can be null.
+ * Select clusters by tags, name and type, the tag and name can be null.
*/
List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @Param("name") String name,
@Param("type") String type);
List<InlongClusterEntity> selectByCondition(InlongClusterPageRequest request);
- int updateByIdSelective(InlongClusterEntity record);
-
int updateById(InlongClusterEntity record);
+ int updateByIdSelective(InlongClusterEntity record);
+
int deleteByPrimaryKey(Integer id);
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index 9847f05b8..7a13d9939 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -30,8 +30,6 @@ public interface InlongClusterNodeEntityMapper {
int insert(InlongClusterNodeEntity record);
- int insertSelective(InlongClusterNodeEntity record);
-
InlongClusterNodeEntity selectById(Integer id);
InlongClusterNodeEntity selectByUniqueKey(ClusterNodeRequest request);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterTagEntityMapper.java
similarity index 58%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterTagEntityMapper.java
index c25e7ff60..2d0d111bc 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterTagEntityMapper.java
@@ -18,32 +18,26 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
-public interface InlongClusterEntityMapper {
+public interface InlongClusterTagEntityMapper {
- int insert(InlongClusterEntity record);
+ int insert(InlongClusterTagEntity record);
- int insertSelective(InlongClusterEntity record);
+ InlongClusterTagEntity selectById(Integer id);
- InlongClusterEntity selectById(Integer id);
+ InlongClusterTagEntity selectByTag(@Param("clusterTag") String clusterTag);
- /**
- * Select clusters by tag, name and type, the tag and name can be null.
- */
- List<InlongClusterEntity> selectByKey(@Param("clusterTag") String clusterTag, @Param("name") String name,
- @Param("type") String type);
+ List<InlongClusterTagEntity> selectByCondition(ClusterTagPageRequest request);
- List<InlongClusterEntity> selectByCondition(InlongClusterPageRequest request);
+ int updateById(InlongClusterTagEntity record);
- int updateByIdSelective(InlongClusterEntity record);
-
- int updateById(InlongClusterEntity record);
+ int updateByPrimaryKeySelective(InlongClusterTagEntity record);
int deleteByPrimaryKey(Integer id);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
index 775d8f21d..c0aa2f5cf 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ClusterSetMapper.xml
@@ -19,38 +19,32 @@
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper
- namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
- <select id="selectProxyCluster"
- resultType="org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster">
- select name as cluster_name, cluster_tag, ext_tag, ext_params
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ClusterSetMapper">
+ <select id="selectProxyCluster" resultType="org.apache.inlong.manager.common.pojo.dataproxy.ProxyCluster">
+ select name as cluster_name, cluster_tags, ext_tag, ext_params
from inlong_cluster
where type = 'DATA_PROXY'
and is_deleted = '0'
</select>
- <select id="selectCacheCluster"
- resultType="org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster">
+ <select id="selectCacheCluster" resultType="org.apache.inlong.manager.common.pojo.dataproxy.CacheCluster">
select name as cluster_name,
type,
- cluster_tag,
+ cluster_tags,
ext_tag,
ext_params
from inlong_cluster
where type in ('PULSAR', 'KAFKA', 'TUBE')
and is_deleted = '0'
</select>
- <select id="selectInlongGroupId"
- resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId">
+ <select id="selectInlongGroupId" resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongGroupId">
select inlong_group_id,
- inlong_cluster_tag as
- cluster_tag,
+ inlong_cluster_tag as cluster_tag,
mq_resource as topic,
ext_params
from inlong_group
where is_deleted = 0
</select>
- <select id="selectInlongStreamId"
- resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId">
+ <select id="selectInlongStreamId" resultType="org.apache.inlong.manager.common.pojo.dataproxy.InlongStreamId">
select inlong_group_id,
inlong_stream_id,
mq_resource as topic,
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index 52b02f22c..4edbc18ba 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -25,7 +25,7 @@
<result column="name" jdbcType="VARCHAR" property="name"/>
<result column="type" jdbcType="VARCHAR" property="type"/>
<result column="url" jdbcType="VARCHAR" property="url"/>
- <result column="cluster_tag" jdbcType="VARCHAR" property="clusterTag"/>
+ <result column="cluster_tags" jdbcType="VARCHAR" property="clusterTags"/>
<result column="ext_tag" jdbcType="VARCHAR" property="extTag"/>
<result column="token" jdbcType="VARCHAR" property="token"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
@@ -39,129 +39,25 @@
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
</resultMap>
<sql id="Base_Column_List">
- id, name, type, url, cluster_tag, ext_tag, token, ext_params, heartbeat,
+ id, name, type, url, cluster_tags, ext_tag, token, ext_params, heartbeat,
in_charges, status, is_deleted, creator, modifier, create_time, modify_time
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
insert into inlong_cluster (id, name, type,
- url, cluster_tag, ext_tag,
+ url, cluster_tags, ext_tag,
token, ext_params, heartbeat,
in_charges, status, is_deleted,
creator, modifier,
create_time, modify_time)
values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
- #{url,jdbcType=VARCHAR}, #{clusterTag,jdbcType=VARCHAR}, #{extTag,jdbcType=VARCHAR},
+ #{url,jdbcType=VARCHAR}, #{clusterTags,jdbcType=VARCHAR}, #{extTag,jdbcType=VARCHAR},
#{token,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{heartbeat,jdbcType=LONGVARCHAR},
#{inCharges,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
#{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
</insert>
- <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
- parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
- insert into inlong_cluster
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="name != null">
- name,
- </if>
- <if test="type != null">
- type,
- </if>
- <if test="url != null">
- url,
- </if>
- <if test="clusterTag != null">
- cluster_tag,
- </if>
- <if test="extTag != null">
- ext_tag,
- </if>
- <if test="token != null">
- token,
- </if>
- <if test="inCharges != null">
- in_charges,
- </if>
- <if test="status != null">
- status,
- </if>
- <if test="isDeleted != null">
- is_deleted,
- </if>
- <if test="creator != null">
- creator,
- </if>
- <if test="modifier != null">
- modifier,
- </if>
- <if test="createTime != null">
- create_time,
- </if>
- <if test="modifyTime != null">
- modify_time,
- </if>
- <if test="extParams != null">
- ext_params,
- </if>
- <if test="heartbeat != null">
- heartbeat,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="name != null">
- #{name,jdbcType=VARCHAR},
- </if>
- <if test="type != null">
- #{type,jdbcType=VARCHAR},
- </if>
- <if test="url != null">
- #{url,jdbcType=VARCHAR},
- </if>
- <if test="clusterTag != null">
- #{clusterTag,jdbcType=VARCHAR},
- </if>
- <if test="extTag != null">
- #{extTag,jdbcType=VARCHAR},
- </if>
- <if test="token != null">
- #{token,jdbcType=VARCHAR},
- </if>
- <if test="inCharges != null">
- #{inCharges,jdbcType=VARCHAR},
- </if>
- <if test="status != null">
- #{status,jdbcType=INTEGER},
- </if>
- <if test="isDeleted != null">
- #{isDeleted,jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- #{creator,jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- #{modifier,jdbcType=VARCHAR},
- </if>
- <if test="createTime != null">
- #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- <if test="extParams != null">
- #{extParams,jdbcType=LONGVARCHAR},
- </if>
- <if test="heartbeat != null">
- #{heartbeat,jdbcType=LONGVARCHAR},
- </if>
- </trim>
- </insert>
<select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
select
@@ -169,26 +65,6 @@
from inlong_cluster
where id = #{id,jdbcType=INTEGER}
</select>
-
- <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
- update inlong_cluster
- set name = #{name,jdbcType=VARCHAR},
- type = #{type,jdbcType=VARCHAR},
- url = #{url,jdbcType=VARCHAR},
- cluster_tag = #{clusterTag,jdbcType=VARCHAR},
- ext_tag = #{extTag,jdbcType=VARCHAR},
- token = #{token,jdbcType=VARCHAR},
- ext_params = #{extParams,jdbcType=LONGVARCHAR},
- heartbeat = #{heartbeat,jdbcType=LONGVARCHAR},
- in_charges = #{inCharges,jdbcType=VARCHAR},
- status = #{status,jdbcType=INTEGER},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- creator = #{creator,jdbcType=VARCHAR},
- modifier = #{modifier,jdbcType=VARCHAR},
- create_time = #{createTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP}
- where id = #{id,jdbcType=INTEGER}
- </update>
<select id="selectByKey" parameterType="org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest"
resultType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
select
@@ -198,7 +74,7 @@
is_deleted = 0
and `type` = #{type, jdbcType=VARCHAR}
<if test="clusterTag != null and clusterTag != ''">
- and cluster_tag = #{clusterTag, jdbcType=VARCHAR}
+ and find_in_set(#{clusterTag, jdbcType=VARCHAR}, cluster_tags)
</if>
<if test="name != null and name != ''">
and name = #{name, jdbcType=VARCHAR}
@@ -227,16 +103,15 @@
</foreach>
</if>
<if test="clusterTag != null and clusterTag != ''">
- and cluster_tag = #{clusterTag, jdbcType=VARCHAR}
+ and find_in_set(#{clusterTag, jdbcType=VARCHAR}, cluster_tags)
</if>
<if test="extTag != null and extTag != ''">
and ext_tag = #{extTag, jdbcType=VARCHAR}
</if>
<if test="keyword != null and keyword != ''">
name like CONCAT('%', #{keyword}, '%')
- or cluster_tag like CONCAT('%', #{keyword}, '%')
+ or cluster_tags like CONCAT('%', #{keyword}, '%')
or ext_tag like CONCAT('%', #{keyword}, '%')
- or url like CONCAT('%', #{keyword}, '%')
)
</if>
<if test="status != null and status != ''">
@@ -246,6 +121,25 @@
order by modify_time desc
</select>
+ <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
+ update inlong_cluster
+ set name = #{name,jdbcType=VARCHAR},
+ type = #{type,jdbcType=VARCHAR},
+ url = #{url,jdbcType=VARCHAR},
+ cluster_tags = #{clusterTags,jdbcType=VARCHAR},
+ ext_tag = #{extTag,jdbcType=VARCHAR},
+ token = #{token,jdbcType=VARCHAR},
+ ext_params = #{extParams,jdbcType=LONGVARCHAR},
+ heartbeat = #{heartbeat,jdbcType=LONGVARCHAR},
+ in_charges = #{inCharges,jdbcType=VARCHAR},
+ status = #{status,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ creator = #{creator,jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
<update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterEntity">
update inlong_cluster
<set>
@@ -258,8 +152,8 @@
<if test="url != null">
url = #{url,jdbcType=VARCHAR},
</if>
- <if test="clusterTag != null">
- cluster_tag = #{clusterTag,jdbcType=VARCHAR},
+ <if test="clusterTags != null">
+ cluster_tags = #{clusterTags,jdbcType=VARCHAR},
</if>
<if test="extTag != null">
ext_tag = #{extTag,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 732ed3c8a..3be803730 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -50,86 +50,6 @@
#{status,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
#{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
</insert>
- <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
- parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
- insert into inlong_cluster_node
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="parentId != null">
- parent_id,
- </if>
- <if test="type != null">
- type,
- </if>
- <if test="ip != null">
- ip,
- </if>
- <if test="port != null">
- port,
- </if>
- <if test="extParams != null">
- ext_params,
- </if>
- <if test="status != null">
- status,
- </if>
- <if test="isDeleted != null">
- is_deleted,
- </if>
- <if test="creator != null">
- creator,
- </if>
- <if test="modifier != null">
- modifier,
- </if>
- <if test="createTime != null">
- create_time,
- </if>
- <if test="modifyTime != null">
- modify_time,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="parentId != null">
- #{parentId,jdbcType=INTEGER},
- </if>
- <if test="type != null">
- #{type,jdbcType=VARCHAR},
- </if>
- <if test="ip != null">
- #{ip,jdbcType=VARCHAR},
- </if>
- <if test="port != null">
- #{port,jdbcType=INTEGER},
- </if>
- <if test="extParams != null">
- #{extParams,jdbcType=LONGVARCHAR},
- </if>
- <if test="status != null">
- #{status,jdbcType=INTEGER},
- </if>
- <if test="isDeleted != null">
- #{isDeleted,jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- #{creator,jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- #{modifier,jdbcType=VARCHAR},
- </if>
- <if test="createTime != null">
- #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- </trim>
- </insert>
<select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
select
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterTagEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterTagEntityMapper.xml
new file mode 100644
index 000000000..01df6f986
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterTagEntityMapper.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="cluster_tag" jdbcType="VARCHAR" property="clusterTag"/>
+ <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
+ <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
+ <result column="status" jdbcType="INTEGER" property="status"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ <result column="creator" jdbcType="VARCHAR" property="creator"/>
+ <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, cluster_tag, ext_params, in_charges, status, is_deleted, creator, modifier, create_time, modify_time
+ </sql>
+
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ insert into inlong_cluster_tag (id, cluster_tag, ext_params,
+ in_charges, status, is_deleted,
+ creator, modifier,
+ create_time, modify_time)
+ values (#{id,jdbcType=INTEGER}, #{clusterTag,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+ #{inCharges,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
+ #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
+ #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
+ </insert>
+
+ <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_cluster_tag
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByTag" resultType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_cluster_tag
+ where cluster_tag = #{clusterTag, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </select>
+ <select id="selectByCondition"
+ parameterType="org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest"
+ resultType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_cluster_tag
+ <where>
+ is_deleted = 0
+ <if test="keyword != null and keyword != ''">
+ and cluster_tag like CONCAT('%', #{keyword}, '%')
+ </if>
+ </where>
+ order by modify_time desc
+ </select>
+
+ <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ update inlong_cluster_tag
+ set cluster_tag = #{clusterTag,jdbcType=VARCHAR},
+ ext_params = #{extParams,jdbcType=LONGVARCHAR},
+ in_charges = #{inCharges,jdbcType=VARCHAR},
+ status = #{status,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ creator = #{creator,jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateByPrimaryKeySelective"
+ parameterType="org.apache.inlong.manager.dao.entity.InlongClusterTagEntity">
+ update inlong_cluster_tag
+ <set>
+ <if test="clusterTag != null">
+ cluster_tag = #{clusterTag,jdbcType=VARCHAR},
+ </if>
+ <if test="extParams != null">
+ ext_params = #{extParams,jdbcType=LONGVARCHAR},
+ </if>
+ <if test="inCharges != null">
+ in_charges = #{inCharges,jdbcType=VARCHAR},
+ </if>
+ <if test="status != null">
+ status = #{status,jdbcType=INTEGER},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="creator != null">
+ creator = #{creator,jdbcType=VARCHAR},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ <if test="createTime != null">
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="modifyTime != null">
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
+ from inlong_cluster_tag
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
index 807988ff3..1aaaa0299 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
@@ -44,7 +44,10 @@ public abstract class AbstractClusterOperator implements InlongClusterOperator {
this.setTargetEntity(request, entity);
entity.setCreator(operator);
- entity.setCreateTime(new Date());
+ entity.setModifier(operator);
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
entity.setIsDeleted(InlongConstants.UN_DELETED);
clusterMapper.insert(entity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index ad851dbf4..80d4d4ca3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -21,6 +21,9 @@ import com.github.pagehelper.PageInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
@@ -33,6 +36,49 @@ import java.util.List;
*/
public interface InlongClusterService {
+ /**
+ * Save cluster tag.
+ *
+ * @param request cluster tag
+ * @param operator name of operator
+ * @return cluster tag id after saving
+ */
+ Integer saveTag(ClusterTagRequest request, String operator);
+
+ /**
+ * Get cluster tag by id.
+ *
+ * @param id cluster tag id
+ * @return cluster tag info
+ */
+ ClusterTagResponse getTag(Integer id);
+
+ /**
+ * Paging query cluster tags according to conditions.
+ *
+ * @param request page request conditions
+ * @return cluster tag list
+ */
+ PageInfo<ClusterTagResponse> listTag(ClusterTagPageRequest request);
+
+ /**
+ * Update cluster tag.
+ *
+ * @param request cluster tag to be modified
+ * @param operator current operator
+ * @return whether succeed
+ */
+ Boolean updateTag(ClusterTagRequest request, String operator);
+
+ /**
+ * Delete cluster tag.
+ *
+ * @param id cluster tag id to be deleted
+ * @param operator current operator
+ * @return whether succeed
+ */
+ Boolean deleteTag(Integer id, String operator);
+
/**
* Save cluster info.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 47ba191c0..17a49dca0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,6 +36,9 @@ import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagResponse;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
@@ -48,8 +51,10 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterTagEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterTagEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
@@ -83,19 +88,118 @@ public class InlongClusterServiceImpl implements InlongClusterService {
@Autowired
private InlongClusterOperatorFactory clusterOperatorFactory;
@Autowired
+ private InlongClusterTagEntityMapper clusterTagMapper;
+ @Autowired
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
@Autowired
private DataProxyConfigRepository proxyRepository;
+ @Override
+ public Integer saveTag(ClusterTagRequest request, String operator) {
+ LOGGER.debug("begin to save cluster tag {}", request);
+ Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
+ Preconditions.checkNotNull(request.getClusterTag(), "cluster tag cannot be empty");
+
+ // check if the cluster tag already exist
+ String clusterTag = request.getClusterTag();
+ InlongClusterTagEntity exist = clusterTagMapper.selectByTag(clusterTag);
+ if (exist != null) {
+ String errMsg = String.format("inlong cluster tag already exist for tag=%s", clusterTag);
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ InlongClusterTagEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterTagEntity::new);
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
+ clusterTagMapper.insert(entity);
+ LOGGER.info("success to save cluster tag={} by user={}", request, operator);
+ return entity.getId();
+ }
+
+ @Override
+ public ClusterTagResponse getTag(Integer id) {
+ Preconditions.checkNotNull(id, "inlong cluster tag id cannot be empty");
+ InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
+ if (entity == null) {
+ LOGGER.error("inlong cluster tag not found by id={}", id);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+
+ ClusterTagResponse response = CommonBeanUtils.copyProperties(entity, ClusterTagResponse::new);
+ LOGGER.debug("success to get cluster tag info by id={}", id);
+ return response;
+ }
+
+ @Override
+ public PageInfo<ClusterTagResponse> listTag(ClusterTagPageRequest request) {
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ Page<InlongClusterTagEntity> entityPage = (Page<InlongClusterTagEntity>) clusterTagMapper
+ .selectByCondition(request);
+ List<ClusterTagResponse> tagList = CommonBeanUtils.copyListProperties(entityPage, ClusterTagResponse::new);
+ PageInfo<ClusterTagResponse> page = new PageInfo<>(tagList);
+ page.setTotal(tagList.size());
+
+ LOGGER.debug("success to list cluster tag by {}", request);
+ return page;
+ }
+
+ @Override
+ public Boolean updateTag(ClusterTagRequest request, String operator) {
+ LOGGER.debug("begin to update cluster tag={}", request);
+ Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
+ Preconditions.checkNotNull(request.getClusterTag(), "inlong cluster tag cannot be empty");
+
+ Integer id = request.getId();
+ Preconditions.checkNotNull(id, "cluster tag id cannot be empty");
+ // check cluster tag if exist
+ InlongClusterTagEntity exist = clusterTagMapper.selectByTag(request.getClusterTag());
+ if (exist != null && !Objects.equals(id, exist.getId())) {
+ String errMsg = String.format("inlong cluster tag already exist for tag=%s", request.getClusterTag());
+ LOGGER.error(errMsg);
+ throw new BusinessException(errMsg);
+ }
+
+ InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
+ if (entity == null) {
+ LOGGER.error("cluster tag not found by id={}", id);
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+ CommonBeanUtils.copyProperties(request, entity, true);
+ entity.setModifier(operator);
+ entity.setModifyTime(new Date());
+ clusterTagMapper.updateById(entity);
+ LOGGER.info("success to update cluster tag={}", request);
+ return true;
+ }
+
+ @Override
+ public Boolean deleteTag(Integer id, String operator) {
+ Preconditions.checkNotNull(id, "cluster tag id cannot be empty");
+ InlongClusterTagEntity entity = clusterTagMapper.selectById(id);
+ if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
+ LOGGER.error("inlong cluster tag not found by id={}", id);
+ return false;
+ }
+ entity.setIsDeleted(entity.getId());
+ entity.setModifier(operator);
+ clusterTagMapper.updateById(entity);
+ LOGGER.info("success to delete cluster tag by id={}", id);
+ return true;
+ }
+
@Override
public Integer save(InlongClusterRequest request, String operator) {
LOGGER.debug("begin to save inlong cluster={}", request);
- Preconditions.checkNotNull(request, "inlong cluster info cannot be empty");
+ Preconditions.checkNotNull(request, "inlong cluster request cannot be empty");
// check if the cluster already exist
- String clusterTag = request.getClusterTag();
+ String clusterTag = request.getClusterTags();
String name = request.getName();
String type = request.getType();
List<InlongClusterEntity> exist = clusterMapper.selectByKey(clusterTag, name, type);
@@ -166,7 +270,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
Preconditions.checkNotNull(id, "inlong cluster id cannot be empty");
// check whether the cluster already exists
- String clusterTag = request.getClusterTag();
+ String clusterTag = request.getClusterTags();
String name = request.getName();
String type = request.getType();
List<InlongClusterEntity> exist = clusterMapper.selectByKey(clusterTag, name, type);
@@ -221,7 +325,10 @@ public class InlongClusterServiceImpl implements InlongClusterService {
InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
entity.setCreator(operator);
- entity.setCreateTime(new Date());
+ entity.setModifier(operator);
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
entity.setIsDeleted(InlongConstants.UN_DELETED);
clusterNodeMapper.insert(entity);
@@ -283,7 +390,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
// check cluster node if exist
InlongClusterNodeEntity exist = clusterNodeMapper.selectByUniqueKey(request);
if (exist != null && !Objects.equals(id, exist.getId())) {
- String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s)",
+ String errMsg = String.format("inlong cluster node already exist for type=%s ip=%s port=%s",
request.getType(), request.getIp(), request.getPort());
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
@@ -366,7 +473,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
// get all inlong groups which was successful and belongs to this data proxy cluster
List<String> clusterTagList = clusterList.stream()
- .map(InlongClusterEntity::getClusterTag)
+ .map(InlongClusterEntity::getClusterTags)
.collect(Collectors.toList());
InlongGroupPageRequest groupRequest = InlongGroupPageRequest.builder()
.status(GroupStatus.CONFIG_SUCCESSFUL.getCode())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
index fcb834fd4..b394732f3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.enums.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.DataNodeType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.common.pojo.node.DataNodeRequest;
@@ -71,7 +71,10 @@ public class DataNodeServiceImpl implements DataNodeService {
}
DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
entity.setCreator(operator);
- entity.setCreateTime(new Date());
+ entity.setModifier(operator);
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
entity.setIsDeleted(InlongConstants.UN_DELETED);
dataNodeMapper.insert(entity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index 530f49cd5..e9088d055 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -140,7 +140,7 @@ public class DataProxyConfigRepository implements IRepository {
Map<String, String> tagMap = MAP_SPLITTER.split(cacheCluster.getExtTag());
String producerTag = tagMap.getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString());
if (StringUtils.equalsIgnoreCase(producerTag, Boolean.TRUE.toString())) {
- cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTag(), k -> new HashMap<>())
+ cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTags(), k -> new HashMap<>())
.computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList<>()).add(cacheCluster);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 4d396b614..42153c93c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -104,7 +104,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
List<SinkField> fields = request.getSinkFieldList();
// Remove id in sinkField when save
if (CollectionUtils.isNotEmpty(fields)) {
- fields.stream().forEach(sinkField -> sinkField.setId(null));
+ fields.forEach(sinkField -> sinkField.setId(null));
}
int id = operation.saveOpt(request, operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 62ccb3295..94220b023 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -88,7 +88,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// Remove id in sourceField when save
List<StreamField> streamFields = request.getFieldList();
if (CollectionUtils.isNotEmpty(streamFields)) {
- streamFields.stream().forEach(streamField -> streamField.setId(null));
+ streamFields.forEach(streamField -> streamField.setId(null));
}
int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 0b5216654..73fc43398 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -47,7 +47,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
*/
public Integer saveDataProxyCluster(String clusterTag, String clusterName, String extTag) {
DataProxyClusterRequest request = new DataProxyClusterRequest();
- request.setClusterTag(clusterTag);
+ request.setClusterTags(clusterTag);
request.setName(clusterName);
request.setType(ClusterType.CLS_DATA_PROXY);
request.setExtTag(extTag);
@@ -60,7 +60,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
*/
public Integer savePulsarCluster(String clusterTag, String clusterName, String adminUrl) {
PulsarClusterRequest request = new PulsarClusterRequest();
- request.setClusterTag(clusterTag);
+ request.setClusterTags(clusterTag);
request.setName(clusterName);
request.setType(ClusterType.CLS_PULSAR);
request.setAdminUrl(adminUrl);
@@ -86,7 +86,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
PulsarClusterRequest request = new PulsarClusterRequest();
request.setId(id);
request.setName(name);
- request.setClusterTag(clusterTag);
+ request.setClusterTags(clusterTag);
request.setAdminUrl(adminUrl);
request.setInCharges(GLOBAL_OPERATOR);
return clusterService.update(request, GLOBAL_OPERATOR);
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 4db31bd8d..23491c6bc 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -33,13 +33,13 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
`peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
`max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
- `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `followers` varchar(512) DEFAULT NULL COMMENT 'Name of followers, separated by commas',
`enable_zookeeper` tinyint(2) DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(2) DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
`lightweight` tinyint(2) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num,',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `followers` varchar(512) DEFAULT NULL COMMENT 'Name of followers, separated by commas',
`status` int(4) DEFAULT '100' COMMENT 'Inlong group status',
`previous_status` int(4) DEFAULT '100' COMMENT 'Previous group status',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
@@ -49,7 +49,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
- INDEX group_status_deleted_idx (`status`, `is_deleted`)
+ INDEX group_status_deleted_index (`status`, `is_deleted`)
);
-- ----------------------------
@@ -64,33 +64,52 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- KEY `index_group_id` (`inlong_group_id`),
+ KEY `group_id_index` (`inlong_group_id`),
UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
);
-- ----------------------------
--- Table structure for inlong_cluster
+-- Table structure for inlong_cluster_tag
-- ----------------------------
-CREATE TABLE IF NOT EXISTS `inlong_cluster`
+CREATE TABLE IF NOT EXISTS `inlong_cluster_tag`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `name` varchar(128) NOT NULL COMMENT 'Cluster name',
- `type` varchar(20) DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
- `url` varchar(512) DEFAULT NULL COMMENT 'Cluster URL',
- `cluster_tag` varchar(128) DEFAULT NULL COMMENT 'Cluster tag, the same tab indicates that cluster belongs to the same set',
- `ext_tag` varchar(128) DEFAULT NULL COMMENT 'Extension tag, for extended use',
- `token` varchar(512) DEFAULT NULL COMMENT 'Cluster token',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will saved as JSON string',
- `heartbeat` text DEFAULT NULL COMMENT 'Cluster heartbeat info',
+ `cluster_tag` varchar(128) NOT NULL COMMENT 'Cluster tag',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_inlong_cluster_tag` (`cluster_tag`, `is_deleted`)
+);
+
+-- ----------------------------
+-- Table structure for inlong_cluster
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_cluster`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `name` varchar(128) NOT NULL COMMENT 'Cluster name',
+ `type` varchar(20) DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
+ `url` varchar(512) DEFAULT NULL COMMENT 'Cluster URL',
+ `cluster_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster tag, separated by commas',
+ `ext_tag` varchar(128) DEFAULT NULL COMMENT 'Extension tag, for extended use',
+ `token` varchar(512) DEFAULT NULL COMMENT 'Cluster token',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `heartbeat` text DEFAULT NULL COMMENT 'Cluster heartbeat info',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cluster_index` (`name`, `type`, `cluster_tag`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
);
-- ----------------------------
@@ -100,18 +119,18 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster',
- `type` varchar(20) DEFAULT '' COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
+ `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
`ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
`port` int(6) NULL COMMENT 'Cluster port',
- `ext_params` text DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `ext_params` text DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
);
-- ----------------------------
@@ -121,20 +140,20 @@ CREATE TABLE IF NOT EXISTS `data_node`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`name` varchar(128) NOT NULL COMMENT 'Node name',
- `type` varchar(20) DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
- `url` varchar(512) DEFAULT NULL COMMENT 'Node URL',
- `username` varchar(128) DEFAULT NULL COMMENT 'Username for node if needed',
- `token` varchar(512) DEFAULT NULL COMMENT 'Node token',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will saved as JSON string',
+ `type` varchar(20) DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
+ `url` varchar(512) DEFAULT NULL COMMENT 'Node URL',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username for node if needed',
+ `token` varchar(512) DEFAULT NULL COMMENT 'Node token',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `status` int(4) DEFAULT '0' COMMENT 'Node status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_data_node_index` (`name`, `type`, `is_deleted`)
+ UNIQUE KEY `unique_data_node` (`name`, `type`, `is_deleted`)
);
-- ----------------------------
@@ -189,7 +208,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`result_info` varchar(64) DEFAULT NULL,
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
);
-- ----------------------------
@@ -200,27 +220,27 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id, non-deleted globally unique',
- `name` varchar(64) DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
- `description` varchar(256) DEFAULT '' COMMENT 'Introduction to inlong stream',
- `mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
- `data_type` varchar(20) DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
- `data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
- `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
- `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
- `sync_send` tinyint(1) DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
- `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
- `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
- `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
- `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
- `storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
- `status` int(4) DEFAULT '100' COMMENT 'Inlong stream status',
- `previous_status` int(4) DEFAULT '100' COMMENT 'Previous status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `name` varchar(64) DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
+ `description` varchar(256) DEFAULT '' COMMENT 'Introduction to inlong stream',
+ `mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
+ `data_type` varchar(20) DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+ `data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
+ `sync_send` tinyint(1) DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+ `storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `status` int(4) DEFAULT '100' COMMENT 'Inlong stream status',
+ `previous_status` int(4) DEFAULT '100' COMMENT 'Previous status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
);
@@ -238,8 +258,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
- KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+ UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+ KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
);
-- ----------------------------
@@ -262,7 +282,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `index_field_stream_id` (`inlong_stream_id`)
+ KEY `field_stream_id_index` (`inlong_stream_id`)
);
-- ----------------------------
@@ -302,8 +322,8 @@ CREATE TABLE IF NOT EXISTS `role`
`update_by` varchar(256) NOT NULL,
`disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_role_code_idx` (`role_code`),
- UNIQUE KEY `unique_role_name_idx` (`role_name`)
+ UNIQUE KEY `unique_role_code` (`role_code`),
+ UNIQUE KEY `unique_role_name` (`role_name`)
);
-- ----------------------------
@@ -366,8 +386,8 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
+ `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
@@ -382,12 +402,12 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
- KEY `source_status_idx` (`status`, `is_deleted`),
- KEY `source_agent_ip_idx` (`agent_ip`, `is_deleted`)
+ KEY `source_status_index` (`status`, `is_deleted`),
+ KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
);
-- ----------------------------
@@ -429,15 +449,15 @@ CREATE TABLE IF NOT EXISTS `stream_sink`
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`sort_task_name` varchar(512) DEFAULT NULL COMMENT 'Sort task name or task ID',
`sort_consumer_group` varchar(512) DEFAULT NULL COMMENT 'Consumer group name for Sort task',
- `ext_params` text COMMENT 'Another fields, will be saved as JSON type',
- `operate_log` varchar(5000) DEFAULT NULL COMMENT 'Background operate log',
+ `ext_params` text NULL COMMENT 'Another fields, will be saved as JSON type',
+ `operate_log` text DEFAULT NULL COMMENT 'Background operate log',
`status` int(11) DEFAULT '100' COMMENT 'Status',
`previous_status` int(11) DEFAULT '100' COMMENT 'Previous status',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
);
@@ -455,7 +475,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- KEY `index_sink_id` (`sink_id`)
+ KEY `sink_id_index` (`sink_id`)
);
-- ----------------------------
@@ -479,9 +499,8 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `index_source_id` (`source_id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
+ KEY `source_id_index` (`source_id`)
+);
-- ----------------------------
-- Table structure for stream_transform_field
@@ -503,12 +522,12 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `origin_node_name` varchar(256) DEFAULT '' COMMENT 'Origin Node name which stream field belongs',
+ `origin_node_name` varchar(256) DEFAULT '' COMMENT 'Origin node name which stream field belongs',
+ -- The source node name of the transport field
`origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name before transform operation',
PRIMARY KEY (`id`),
- KEY `index_transform_id` (`transform_id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
+ KEY `transform_id_index` (`transform_id`)
+);
-- ----------------------------
-- Table structure for stream_sink_field
@@ -549,7 +568,7 @@ CREATE TABLE IF NOT EXISTS `user`
`create_by` varchar(256) NOT NULL COMMENT 'create by sb.',
`update_by` varchar(256) DEFAULT NULL COMMENT 'update by sb.',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_user_name_idx` (`name`)
+ UNIQUE KEY `unique_user_name` (`name`)
);
-- ----------------------------
@@ -558,13 +577,13 @@ CREATE TABLE IF NOT EXISTS `user`
CREATE TABLE IF NOT EXISTS `user_role`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
- `user_name` varchar(256) NOT NULL COMMENT 'username',
- `role_code` varchar(256) NOT NULL COMMENT 'role code',
+ `user_name` varchar(256) NOT NULL COMMENT 'Username',
+ `role_code` varchar(256) NOT NULL COMMENT 'User role code',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`create_by` varchar(256) NOT NULL,
`update_by` varchar(256) NOT NULL,
- `disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
+ `disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled, 0-enabled, 1-disabled',
PRIMARY KEY (`id`)
);
@@ -621,7 +640,7 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
`remark` text COMMENT 'Execution result remark information',
`exception` text COMMENT 'Exception information',
PRIMARY KEY (`id`),
- INDEX group_status_idx (`inlong_group_id`, `status`)
+ INDEX group_status_index (`inlong_group_id`, `status`)
);
-- ----------------------------
@@ -667,7 +686,7 @@ CREATE TABLE IF NOT EXISTS `workflow_task`
`end_time` datetime DEFAULT NULL COMMENT 'End time',
`ext_params` text COMMENT 'Extended information-json',
PRIMARY KEY (`id`),
- INDEX process_status_idx (`process_id`, `status`)
+ INDEX process_status_index (`process_id`, `status`)
);
-- ----------------------------
@@ -709,7 +728,7 @@ CREATE TABLE IF NOT EXISTS `sort_cluster_config`
`task_name` varchar(128) NOT NULL COMMENT 'Task name',
`sink_type` varchar(128) NOT NULL COMMENT 'Type of sink',
PRIMARY KEY (`id`),
- KEY `index_sort_cluster_config` (`cluster_name`)
+ KEY `sort_cluster_config_index` (`cluster_name`)
);
-- ----------------------------
@@ -724,7 +743,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_id_param`
`param_key` varchar(128) NOT NULL COMMENT 'Key of param',
`param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
PRIMARY KEY (`id`),
- KEY `index_sort_task_id_param` (`task_name`)
+ KEY `sort_task_id_param_index` (`task_name`)
);
-- ----------------------------
@@ -738,7 +757,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_sink_param`
`param_key` varchar(128) NOT NULL COMMENT 'Key of param',
`param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
PRIMARY KEY (`id`),
- KEY `index_sort_task_sink_params` (`task_name`, `sink_type`)
+ KEY `sort_task_sink_params_index` (`task_name`, `sink_type`)
);
-- ----------------------------
@@ -753,7 +772,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
`topic` varchar(128) DEFAULT '' COMMENT 'Topic',
`ext_params` text DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
PRIMARY KEY (`id`),
- KEY `index_sort_source_config` (`cluster_name`, `task_name`)
+ KEY `sort_source_config_index` (`cluster_name`, `task_name`)
);
@@ -785,7 +804,7 @@ CREATE TABLE IF NOT EXISTS `component_heartbeat`
`instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
`status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
`metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+ `report_time` bigint(20) NOT NULL COMMENT 'Report time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
@@ -803,7 +822,7 @@ CREATE TABLE IF NOT EXISTS `group_heartbeat`
`inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
`status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
`metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+ `report_time` bigint(20) NOT NULL COMMENT 'Report time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
@@ -822,7 +841,7 @@ CREATE TABLE IF NOT EXISTS `stream_heartbeat`
`inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
`status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
`metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+ `report_time` bigint(20) NOT NULL COMMENT 'Report time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 0a4a2727a..d5a94d94f 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -55,7 +55,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_group` (`inlong_group_id`, `is_deleted`),
- INDEX group_status_deleted_idx (`status`, `is_deleted`)
+ INDEX group_status_deleted_index (`status`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group table';
@@ -71,34 +71,54 @@ CREATE TABLE IF NOT EXISTS `inlong_group_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- KEY `index_group_id` (`inlong_group_id`),
- UNIQUE KEY `unique_group_key` (`inlong_group_id`, `key_name`)
+ KEY `group_id_index` (`inlong_group_id`),
+ UNIQUE KEY `unique_inlong_group_key` (`inlong_group_id`, `key_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong group extension table';
-- ----------------------------
--- Table structure for inlong_cluster
+-- Table structure for inlong_cluster_tag
-- ----------------------------
-CREATE TABLE IF NOT EXISTS `inlong_cluster`
+CREATE TABLE IF NOT EXISTS `inlong_cluster_tag`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `name` varchar(128) NOT NULL COMMENT 'Cluster name',
- `type` varchar(20) DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
- `url` varchar(512) DEFAULT NULL COMMENT 'Cluster URL',
- `cluster_tag` varchar(128) DEFAULT NULL COMMENT 'Cluster tag, the same tab indicates that cluster belongs to the same set',
- `ext_tag` varchar(128) DEFAULT NULL COMMENT 'Extension tag, for extended use',
- `token` varchar(512) DEFAULT NULL COMMENT 'Cluster token',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will saved as JSON string',
- `heartbeat` text DEFAULT NULL COMMENT 'Cluster heartbeat info',
+ `cluster_tag` varchar(128) NOT NULL COMMENT 'Cluster tag',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_inlong_cluster_tag` (`cluster_tag`, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster tag table';
+
+-- ----------------------------
+-- Table structure for inlong_cluster
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_cluster`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `name` varchar(128) NOT NULL COMMENT 'Cluster name',
+ `type` varchar(20) DEFAULT '' COMMENT 'Cluster type, such as: TUBE, PULSAR, DATA_PROXY, etc',
+ `url` varchar(512) DEFAULT NULL COMMENT 'Cluster URL',
+ `cluster_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster tag, separated by commas',
+ `ext_tag` varchar(128) DEFAULT NULL COMMENT 'Extension tag, for extended use',
+ `token` varchar(512) DEFAULT NULL COMMENT 'Cluster token',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `heartbeat` text DEFAULT NULL COMMENT 'Cluster heartbeat info',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cluster_index` (`name`, `type`, `cluster_tag`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster` (`name`, `type`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster table';
@@ -109,18 +129,18 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`parent_id` int(11) NOT NULL COMMENT 'Id of the parent cluster',
- `type` varchar(20) DEFAULT '' COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
- `ip` varchar(512) NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
+ `type` varchar(20) NOT NULL COMMENT 'Cluster type, such as: DATA_PROXY, AGENT, etc',
+ `ip` varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
`port` int(6) NULL COMMENT 'Cluster port',
- `ext_params` text DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
- `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `ext_params` text DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
+ `status` int(4) DEFAULT '0' COMMENT 'Cluster status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
+ UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table';
@@ -131,20 +151,20 @@ CREATE TABLE IF NOT EXISTS `data_node`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`name` varchar(128) NOT NULL COMMENT 'Node name',
- `type` varchar(20) DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
- `url` varchar(512) DEFAULT NULL COMMENT 'Node URL',
- `username` varchar(128) DEFAULT NULL COMMENT 'Username for node',
- `token` varchar(512) DEFAULT NULL COMMENT 'Node token',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `type` varchar(20) DEFAULT '' COMMENT 'Node type, such as: MYSQL, HIVE, KAFKA, ES, etc',
+ `url` varchar(512) DEFAULT NULL COMMENT 'Node URL',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username for node if needed',
+ `token` varchar(512) DEFAULT NULL COMMENT 'Node token',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(4) DEFAULT '0' COMMENT 'Node status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `status` int(4) DEFAULT '0' COMMENT 'Node status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_data_node_index` (`name`, `type`, `is_deleted`)
+ UNIQUE KEY `unique_data_node` (`name`, `type`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table';
@@ -157,7 +177,7 @@ CREATE TABLE IF NOT EXISTS `consumption`
`consumer_group` varchar(256) NOT NULL COMMENT 'Consumer group',
`in_charges` varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `mq_type` varchar(10) DEFAULT 'TUBE' COMMENT 'Message queue, high throughput: TUBE, high consistency: PULSAR',
+ `mq_type` varchar(10) DEFAULT 'TUBE' COMMENT 'Message queue type, high throughput: TUBE, high consistency: PULSAR',
`topic` varchar(256) NOT NULL COMMENT 'Consumption topic',
`filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
`inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream ID for consumption, if filter_enable is 1, it cannot empty',
@@ -203,7 +223,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_cmd_config`
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`result_info` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`),
- KEY `index_1` (`task_id`, `bSend`, `specified_data_time`)
+ KEY `unique_source_cmd_config` (`task_id`, `bSend`, `specified_data_time`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
@@ -215,27 +235,27 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Owning inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id, non-deleted globally unique',
- `name` varchar(64) DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
- `description` varchar(256) DEFAULT '' COMMENT 'Introduction to inlong stream',
- `mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
- `data_type` varchar(20) DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
- `data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
- `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
- `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
- `sync_send` tinyint(1) DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
- `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
- `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
- `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
- `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
- `storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
- `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
- `status` int(4) DEFAULT '100' COMMENT 'Inlong stream status',
- `previous_status` int(4) DEFAULT '100' COMMENT 'Previous status',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
- `creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `name` varchar(64) DEFAULT NULL COMMENT 'The name of the inlong stream page display, can be Chinese',
+ `description` varchar(256) DEFAULT '' COMMENT 'Introduction to inlong stream',
+ `mq_resource` varchar(128) DEFAULT NULL COMMENT 'MQ resource, in one stream, corresponding to the filter ID of Tube, corresponding to the topic of Pulsar',
+ `data_type` varchar(20) DEFAULT NULL COMMENT 'Data type, including: CSV, KEY-VALUE, JSON, AVRO, etc.',
+ `data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK, etc.',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
+ `sync_send` tinyint(1) DEFAULT '0' COMMENT 'order_preserving 0: none, 1: yes',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+ `storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
+ `ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+ `status` int(4) DEFAULT '100' COMMENT 'Inlong stream status',
+ `previous_status` int(4) DEFAULT '100' COMMENT 'Previous status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
) ENGINE = InnoDB
@@ -254,8 +274,8 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
- KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+ UNIQUE KEY `unique_inlong_stream_key` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+ KEY `stream_id_index` (`inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
@@ -279,7 +299,7 @@ CREATE TABLE IF NOT EXISTS `inlong_stream_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `index_field_stream_id` (`inlong_stream_id`)
+ KEY `field_stream_id_index` (`inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='File/DB data source field table';
@@ -321,8 +341,8 @@ CREATE TABLE IF NOT EXISTS `role`
`update_by` varchar(256) NOT NULL,
`disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_role_code_idx` (`role_code`),
- UNIQUE KEY `unique_role_name_idx` (`role_name`)
+ UNIQUE KEY `unique_role_code` (`role_code`),
+ UNIQUE KEY `unique_role_name` (`role_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Role Table';
@@ -404,12 +424,12 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
- KEY `source_status_idx` (`status`, `is_deleted`),
- KEY `source_agent_ip_idx` (`agent_ip`, `is_deleted`)
+ KEY `source_status_index` (`status`, `is_deleted`),
+ KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
@@ -433,7 +453,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform`
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`) USING BTREE
+ UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform table';
@@ -460,8 +480,8 @@ CREATE TABLE IF NOT EXISTS `stream_sink`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_sink_name` (`inlong_group_id`, `inlong_stream_id`, `sink_name`, `is_deleted`)
) ENGINE = InnoDB
@@ -480,7 +500,7 @@ CREATE TABLE IF NOT EXISTS `stream_sink_ext`
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- KEY `index_sink_id` (`sink_id`)
+ KEY `sink_id_index` (`sink_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink extension table';
@@ -505,7 +525,7 @@ CREATE TABLE IF NOT EXISTS `stream_source_field`
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `index_source_id` (`source_id`)
+ KEY `source_id_index` (`source_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
@@ -533,7 +553,7 @@ CREATE TABLE IF NOT EXISTS `stream_transform_field`
-- The source node name of the transport field
`origin_field_name` varchar(50) DEFAULT '' COMMENT 'Origin field name before transform operation',
PRIMARY KEY (`id`),
- KEY `index_transform_id` (`transform_id`)
+ KEY `transform_id_index` (`transform_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
@@ -577,7 +597,7 @@ CREATE TABLE IF NOT EXISTS `user`
`create_by` varchar(256) NOT NULL COMMENT 'create by sb.',
`update_by` varchar(256) DEFAULT NULL COMMENT 'update by sb.',
PRIMARY KEY (`id`),
- UNIQUE KEY `unique_user_name_idx` (`name`)
+ UNIQUE KEY `unique_user_name` (`name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='User table';
@@ -592,13 +612,13 @@ VALUES ('admin', '628ed559bff5ae36bd2184d4216973cf', 0, '2099-12-31 23:59:59',
CREATE TABLE IF NOT EXISTS `user_role`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
- `user_name` varchar(256) NOT NULL COMMENT 'username rtx',
- `role_code` varchar(256) NOT NULL COMMENT 'role',
+ `user_name` varchar(256) NOT NULL COMMENT 'Username',
+ `role_code` varchar(256) NOT NULL COMMENT 'User role code',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`create_by` varchar(256) NOT NULL,
`update_by` varchar(256) NOT NULL,
- `disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
+ `disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled, 0-enabled, 1-disabled',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='User Role Table';
@@ -657,7 +677,7 @@ CREATE TABLE IF NOT EXISTS `workflow_event_log`
`remark` text COMMENT 'Execution result remark information',
`exception` text COMMENT 'Exception information',
PRIMARY KEY (`id`),
- INDEX group_status_idx (`inlong_group_id`, `status`)
+ INDEX group_status_index (`inlong_group_id`, `status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow event log table';
@@ -705,7 +725,7 @@ CREATE TABLE IF NOT EXISTS `workflow_task`
`end_time` datetime DEFAULT NULL COMMENT 'End time',
`ext_params` text COMMENT 'Extended information-json',
PRIMARY KEY (`id`),
- INDEX process_status_idx (`process_id`, `status`)
+ INDEX process_status_index (`process_id`, `status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow task table';
@@ -749,7 +769,7 @@ CREATE TABLE IF NOT EXISTS `sort_cluster_config`
`task_name` varchar(128) NOT NULL COMMENT 'Task name',
`sink_type` varchar(128) NOT NULL COMMENT 'Type of sink',
PRIMARY KEY (`id`),
- KEY `index_sort_cluster_config` (`cluster_name`)
+ KEY `sort_cluster_config_index` (`cluster_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort cluster config table';
@@ -765,7 +785,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_id_param`
`param_key` varchar(128) NOT NULL COMMENT 'Key of param',
`param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
PRIMARY KEY (`id`),
- KEY `index_sort_task_id_param` (`task_name`)
+ KEY `sort_task_id_param_index` (`task_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task id params table';
@@ -780,7 +800,7 @@ CREATE TABLE IF NOT EXISTS `sort_task_sink_param`
`param_key` varchar(128) NOT NULL COMMENT 'Key of param',
`param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
PRIMARY KEY (`id`),
- KEY `index_sort_task_sink_params` (`task_name`, `sink_type`)
+ KEY `sort_task_sink_params_index` (`task_name`, `sink_type`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task sink params table';
@@ -796,7 +816,7 @@ CREATE TABLE IF NOT EXISTS `sort_source_config`
`topic` varchar(128) DEFAULT '' COMMENT 'Topic',
`ext_params` text DEFAULT NULL COMMENT 'Another fields, will be saved as JSON type',
PRIMARY KEY (`id`),
- KEY `index_sort_source_config` (`cluster_name`, `task_name`)
+ KEY `sort_source_config_index` (`cluster_name`, `task_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 23a908e6b..93d661dbc 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -25,15 +25,19 @@ import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagPageRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterTagResponse;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.user.UserRoleCode;
import org.apache.inlong.manager.common.util.LoginUserUtils;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.operationlog.OperationLog;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -47,44 +51,83 @@ import org.springframework.web.bind.annotation.RestController;
* Inlong cluster controller
*/
@RestController
-@RequestMapping("/cluster")
@Api(tags = "Inlong-Cluster-API")
public class InlongClusterController {
@Autowired
private InlongClusterService clusterService;
- @PostMapping(value = "/save")
+ @PostMapping(value = "/cluster/tag/save")
+ @ApiOperation(value = "Save cluster tag")
+ @OperationLog(operation = OperationType.CREATE)
+ @RequiresRoles(value = UserRoleCode.ADMIN)
+ public Response<Integer> saveTag(@Validated @RequestBody ClusterTagRequest request) {
+ String currentUser = LoginUserUtils.getLoginUserDetail().getUsername();
+ return Response.success(clusterService.saveTag(request, currentUser));
+ }
+
+ @GetMapping(value = "/cluster/tag/get/{id}")
+ @ApiOperation(value = "Get cluster tag by id")
+ @ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
+ public Response<ClusterTagResponse> getTag(@PathVariable Integer id) {
+ return Response.success(clusterService.getTag(id));
+ }
+
+ @PostMapping(value = "/cluster/tag/list")
+ @ApiOperation(value = "List cluster tags")
+ public Response<PageInfo<ClusterTagResponse>> listTag(@RequestBody ClusterTagPageRequest request) {
+ request.setCurrentUser(LoginUserUtils.getLoginUserDetail().getUsername());
+ return Response.success(clusterService.listTag(request));
+ }
+
+ @PostMapping(value = "/cluster/tag/update")
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Update cluster tag")
+ public Response<Boolean> updateTag(@Validated @RequestBody ClusterTagRequest request) {
+ String username = LoginUserUtils.getLoginUserDetail().getUsername();
+ return Response.success(clusterService.updateTag(request, username));
+ }
+
+ @DeleteMapping(value = "/cluster/tag/delete/{id}")
+ @ApiOperation(value = "Delete cluster tag by id")
+ @OperationLog(operation = OperationType.DELETE)
+ @ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
+ @RequiresRoles(value = UserRoleCode.ADMIN)
+ public Response<Boolean> deleteTag(@PathVariable Integer id) {
+ return Response.success(clusterService.deleteTag(id, LoginUserUtils.getLoginUserDetail().getUsername()));
+ }
+
+ @PostMapping(value = "/cluster/save")
@ApiOperation(value = "Save cluster")
@OperationLog(operation = OperationType.CREATE)
@RequiresRoles(value = UserRoleCode.ADMIN)
- public Response<Integer> save(@RequestBody InlongClusterRequest request) {
+ public Response<Integer> save(@Validated @RequestBody InlongClusterRequest request) {
String currentUser = LoginUserUtils.getLoginUserDetail().getUsername();
return Response.success(clusterService.save(request, currentUser));
}
- @GetMapping(value = "/get/{id}")
+ @GetMapping(value = "/cluster/get/{id}")
@ApiOperation(value = "Get cluster by id")
@ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
public Response<InlongClusterInfo> get(@PathVariable Integer id) {
return Response.success(clusterService.get(id));
}
- @PostMapping(value = "/list")
+ @PostMapping(value = "/cluster/list")
@ApiOperation(value = "List clusters")
public Response<PageInfo<InlongClusterInfo>> list(@RequestBody InlongClusterPageRequest request) {
return Response.success(clusterService.list(request));
}
- @PostMapping(value = "/update")
+ @PostMapping(value = "/cluster/update")
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update cluster")
- public Response<Boolean> update(@RequestBody InlongClusterRequest request) {
+ public Response<Boolean> update(@Validated @RequestBody InlongClusterRequest request) {
String username = LoginUserUtils.getLoginUserDetail().getUsername();
return Response.success(clusterService.update(request, username));
}
- @DeleteMapping(value = "/delete/{id}")
+ @DeleteMapping(value = "/cluster/delete/{id}")
@ApiOperation(value = "Delete cluster by id")
@OperationLog(operation = OperationType.DELETE)
@ApiImplicitParam(name = "id", value = "Cluster ID", dataTypeClass = Integer.class, required = true)
@@ -93,36 +136,36 @@ public class InlongClusterController {
return Response.success(clusterService.delete(id, LoginUserUtils.getLoginUserDetail().getUsername()));
}
- @PostMapping(value = "/node/save")
+ @PostMapping(value = "/cluster/node/save")
@ApiOperation(value = "Save cluster node")
@OperationLog(operation = OperationType.CREATE)
- public Response<Integer> saveNode(@RequestBody ClusterNodeRequest request) {
+ public Response<Integer> saveNode(@Validated @RequestBody ClusterNodeRequest request) {
String currentUser = LoginUserUtils.getLoginUserDetail().getUsername();
return Response.success(clusterService.saveNode(request, currentUser));
}
- @GetMapping(value = "/node/get/{id}")
+ @GetMapping(value = "/cluster/node/get/{id}")
@ApiOperation(value = "Get cluster node by id")
@ApiImplicitParam(name = "id", value = "Cluster node ID", dataTypeClass = Integer.class, required = true)
public Response<ClusterNodeResponse> getNode(@PathVariable Integer id) {
return Response.success(clusterService.getNode(id));
}
- @PostMapping(value = "/node/list")
+ @PostMapping(value = "/cluster/node/list")
@ApiOperation(value = "List cluster nodes")
public Response<PageInfo<ClusterNodeResponse>> listNode(@RequestBody InlongClusterPageRequest request) {
return Response.success(clusterService.listNode(request));
}
- @RequestMapping(value = "/node/update", method = RequestMethod.POST)
+ @RequestMapping(value = "/cluster/node/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update cluster node")
- public Response<Boolean> updateNode(@RequestBody ClusterNodeRequest request) {
+ public Response<Boolean> updateNode(@Validated @RequestBody ClusterNodeRequest request) {
String username = LoginUserUtils.getLoginUserDetail().getUsername();
return Response.success(clusterService.updateNode(request, username));
}
- @RequestMapping(value = "/node/delete/{id}", method = RequestMethod.DELETE)
+ @RequestMapping(value = "/cluster/node/delete/{id}", method = RequestMethod.DELETE)
@ApiOperation(value = "Delete cluster node")
@OperationLog(operation = OperationType.DELETE)
@ApiImplicitParam(name = "id", value = "Cluster node id", dataTypeClass = Integer.class, required = true)