You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 10:35:26 UTC
[inlong] branch master updated: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bfc7e3b3a [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
bfc7e3b3a is described below
commit bfc7e3b3a13c03878708e2ca995f121593ce3061
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:35:19 2023 +0800
[INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
---
.../inlong/agent/constant/AgentConstants.java | 2 +-
.../apache/inlong/agent/core/HeartbeatManager.java | 8 +-
inlong-agent/conf/agent.properties | 2 +-
.../inlong/common/heartbeat/HeartbeatMsg.java | 4 +-
.../inlong/manager/client/api/InlongClient.java | 9 ---
.../manager/client/api/impl/InlongClientImpl.java | 6 --
.../api/inner/client/InlongClusterClient.java | 13 ---
.../client/api/service/InlongClusterApi.java | 4 -
.../manager/common/consts/AgentConstants.java} | 33 +-------
.../dao/entity/InlongClusterNodeEntity.java | 1 -
.../manager/dao/entity/StreamSourceEntity.java | 2 +-
.../mappers/InlongClusterNodeEntityMapper.xml | 13 ++-
.../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
.../AgentClusterNodeBindGroupRequest.java} | 19 ++---
.../inlong/manager/pojo/source/SourceRequest.java | 4 +-
.../service/cluster/InlongClusterService.java | 10 ---
.../service/cluster/InlongClusterServiceImpl.java | 55 -------------
.../inlong/manager/service/core/AgentService.java | 8 ++
.../service/core/impl/AgentServiceImpl.java | 92 +++++++++++++++++++---
.../service/heartbeat/HeartbeatManager.java | 24 +++++-
.../service/core/impl/AgentServiceTest.java | 74 ++++++++---------
.../inlong/manager/service/mocks/MockAgent.java | 12 +--
.../main/resources/h2/apache_inlong_manager.sql | 3 +-
.../manager-web/sql/apache_inlong_manager.sql | 3 +-
inlong-manager/manager-web/sql/changes-1.5.0.sql | 6 +-
.../web/controller/InlongClusterController.java | 9 ---
.../web/controller/openapi/AgentController.java | 6 ++
27 files changed, 195 insertions(+), 241 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 2933b907b..eceb0dbf3 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -110,7 +110,7 @@ public class AgentConstants {
public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false;
- public static final String AGENT_NODE_TAG = "agent.node.tag";
+ public static final String AGENT_NODE_GROUP = "agent.node.group";
public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port";
public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 067c92361..9451f8b8d 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -51,7 +51,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
@@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
final String clusterName = conf.get(AGENT_CLUSTER_NAME);
final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES);
- final String nodeTag = conf.get(AGENT_NODE_TAG);
+ final String nodeGroup = conf.get(AGENT_NODE_GROUP);
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(agentIp);
@@ -227,8 +227,8 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
if (StringUtils.isNotBlank(inCharges)) {
heartbeatMsg.setInCharges(inCharges);
}
- if (StringUtils.isNotBlank(nodeTag)) {
- heartbeatMsg.setNodeTag(nodeTag);
+ if (StringUtils.isNotBlank(nodeGroup)) {
+ heartbeatMsg.setNodeGroup(nodeGroup);
}
Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
diff --git a/inlong-agent/conf/agent.properties b/inlong-agent/conf/agent.properties
index ee0942a22..e8dfd5911 100755
--- a/inlong-agent/conf/agent.properties
+++ b/inlong-agent/conf/agent.properties
@@ -42,7 +42,7 @@ thread.pool.await.time=30
agent.local.ip=127.0.0.1
agent.local.uuid=
agent.local.uuid.open=false
-agent.node.tag=default_tag
+agent.node.group=default_group
agent.enable.oom.exit=false
agent.custom.fixed.ip=
# max capacity of memory channel
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index 31d1eee65..3a54b714f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -76,9 +76,9 @@ public class HeartbeatMsg {
private String clusterTag;
/**
- * Tag of node, separated by commas(,)
+ * Group of node for filtering stream source collect task, separated by commas(,)
*/
- private String nodeTag;
+ private String nodeGroup;
/**
* Ext tag of cluster, key=value pairs seperated by &
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index a1be17a92..d65660f3e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.client.api;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -261,12 +260,4 @@ public interface InlongClient {
* @return whether succeed
*/
Boolean deleteNode(Integer id);
-
- /**
- * Bind or unbind cluster tag node for cluster node.
- *
- * @param request cluster info to be modified
- * @return whether succeed
- */
- Boolean bindNodeTag(ClusterNodeBindTagRequest request);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 69a538eea..c8f988e44 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -39,7 +39,6 @@ import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -288,11 +287,6 @@ public class InlongClientImpl implements InlongClient {
return clusterClient.deleteNode(id);
}
- @Override
- public Boolean bindNodeTag(ClusterNodeBindTagRequest request) {
- return clusterClient.bindNodeTag(request);
- }
-
private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus groupStatus, List<StreamSource> sources) {
Map<SimpleSourceStatus, List<StreamSource>> statusListMap = Maps.newHashMap();
sources.forEach(source -> {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
index ecf271102..7140bf595 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -23,7 +23,6 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -297,16 +296,4 @@ public class InlongClusterClient {
ClientUtils.assertRespSuccess(response);
return response.getData();
}
-
- /**
- * Bind or unbind cluster tag node for cluster node.
- *
- * @param request cluster info to be modified
- * @return whether succeed
- */
- public Boolean bindNodeTag(ClusterNodeBindTagRequest request) {
- Response<Boolean> response = ClientUtils.executeHttpCall(inlongClusterApi.bindNodeTag(request));
- ClientUtils.assertRespSuccess(response);
- return response.getData();
- }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
index 79b98bf9b..776aacec0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.client.api.service;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -99,7 +98,4 @@ public interface InlongClusterApi {
@DELETE("cluster/node/delete/{id}")
Call<Response<Boolean>> deleteNode(@Path("id") Integer id);
-
- @POST("cluster/node/bindTag")
- Call<Response<Boolean>> bindNodeTag(@Body ClusterNodeBindTagRequest request);
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
similarity index 50%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
index a084f72fb..85da5a876 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
@@ -15,37 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
+package org.apache.inlong.manager.common.consts;
/**
- * Inlong cluster node entity, including parent id, type, ip, etc.
+ * Constant class for agent ext params
*/
-@Data
-public class InlongClusterNodeEntity implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private Integer id;
- private Integer parentId;
- private String type;
- private String ip;
- private Integer port;
- private String protocolType;
- private Integer nodeLoad;
- private String nodeTags;
- private String extParams;
- private String description;
-
- private Integer status;
- private Integer isDeleted;
- private String creator;
- private String modifier;
- private Date createTime;
- private Date modifyTime;
- private Integer version;
+public class AgentConstants {
+ public static final String AGENT_GROUP_KEY = "agentGroup";
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
index a084f72fb..b57aa6d26 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongClusterNodeEntity.java
@@ -36,7 +36,6 @@ public class InlongClusterNodeEntity implements Serializable {
private Integer port;
private String protocolType;
private Integer nodeLoad;
- private String nodeTags;
private String extParams;
private String description;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index c4de96c0f..561270f63 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -42,7 +42,7 @@ public class StreamSourceEntity implements Serializable {
private String dataNodeName;
private String inlongClusterName;
- private String inlongClusterNodeTag;
+ private String inlongClusterNodeGroup;
private String serializationType;
private String snapshot;
private Date reportTime;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 491dba3cd..6465d3c91 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -28,7 +28,6 @@
<result column="port" jdbcType="INTEGER" property="port"/>
<result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/>
<result column="node_load" jdbcType="INTEGER" property="nodeLoad"/>
- <result column="node_tags" jdbcType="VARCHAR" property="nodeTags"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
<result column="description" jdbcType="VARCHAR" property="description"/>
<result column="status" jdbcType="INTEGER" property="status"/>
@@ -40,7 +39,7 @@
<result column="version" jdbcType="INTEGER" property="version"/>
</resultMap>
<sql id="Base_Column_List">
- id, parent_id, type, ip, port, protocol_type, node_load, node_tags, ext_params, description,
+ id, parent_id, type, ip, port, protocol_type, node_load, ext_params, description,
status, is_deleted, creator, modifier, create_time, modify_time, version
</sql>
@@ -48,12 +47,12 @@
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
insert into inlong_cluster_node (id, parent_id, type,
ip, port, protocol_type,
- node_load, node_tags, ext_params,
+ node_load, ext_params,
description, status,
creator, modifier)
values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
#{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
- #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+ #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
#{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>
@@ -62,14 +61,13 @@
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
insert into inlong_cluster_node (id, parent_id, type,
ip, port, protocol_type,
- node_load, node_tags, ext_params, status,
+ node_load, ext_params, status,
creator, modifier)
values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
#{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
- #{nodeLoad,jdbcType=INTEGER}, #{nodeTags,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
+ #{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
#{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
ON DUPLICATE KEY UPDATE node_load = VALUES(node_load),
- node_tags = VALUES(node_tags),
ext_params = VALUES(ext_params),
status = VALUES(status),
modifier = VALUES(modifier)
@@ -143,7 +141,6 @@
port = #{port,jdbcType=INTEGER},
protocol_type = #{protocolType,jdbcType=VARCHAR},
node_load = #{nodeLoad,jdbcType=INTEGER},
- node_tags = #{nodeTags,jdbcType=VARCHAR},
ext_params = #{extParams,jdbcType=LONGVARCHAR},
description = #{description,jdbcType=VARCHAR},
status = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 7ba947b1f..2858e986a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -31,7 +31,7 @@
<result column="uuid" jdbcType="VARCHAR" property="uuid"/>
<result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
<result column="inlong_cluster_name" jdbcType="VARCHAR" property="inlongClusterName"/>
- <result column="inlong_cluster_node_tag" jdbcType="VARCHAR" property="inlongClusterNodeTag"/>
+ <result column="inlong_cluster_node_group" jdbcType="VARCHAR" property="inlongClusterNodeGroup"/>
<result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
<result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
<result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
@@ -47,7 +47,7 @@
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid,
- data_node_name, inlong_cluster_name, inlong_cluster_node_tag, serialization_type, snapshot, report_time,
+ data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time,
ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
</sql>
@@ -55,14 +55,14 @@
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
insert into stream_source (inlong_group_id, inlong_stream_id,
source_type, source_name, template_id, agent_ip,
- uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_tag,
+ uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group,
serialization_type, snapshot,
report_time, ext_params, status,
previous_status, creator, modifier)
values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
#{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR},
- #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeTag,jdbcType=VARCHAR},
+ #{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeGroup,jdbcType=VARCHAR},
#{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
#{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
#{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
@@ -337,8 +337,8 @@
<if test="inlongClusterName != null">
inlong_cluster_name = #{inlongClusterName,jdbcType=INTEGER},
</if>
- <if test="inlongClusterNodeTag != null">
- inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=INTEGER},
+ <if test="inlongClusterNodeGroup != null">
+ inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=INTEGER},
</if>
<if test="serializationType != null">
serialization_type = #{serializationType,jdbcType=VARCHAR},
@@ -382,7 +382,7 @@
uuid = #{uuid,jdbcType=VARCHAR},
data_node_name = #{dataNodeName,jdbcType=VARCHAR},
inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
- inlong_cluster_node_tag = #{inlongClusterNodeTag,jdbcType=VARCHAR},
+ inlong_cluster_node_group = #{inlongClusterNodeGroup,jdbcType=VARCHAR},
serialization_type = #{serializationType,jdbcType=VARCHAR},
snapshot = #{snapshot,jdbcType=LONGVARCHAR},
report_time = #{reportTime,jdbcType=TIMESTAMP},
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
similarity index 72%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
index ad6fe0673..493c03ab4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindTagRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeBindGroupRequest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.cluster;
+package org.apache.inlong.manager.pojo.cluster.agent;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -25,24 +25,21 @@ import javax.validation.constraints.NotBlank;
import java.util.List;
/**
- * Inlong cluster node bind or unbind tag request
+ * Inlong cluster node bind or unbind group.Group is used to distinguish which stream source tasks are collected
*/
@Data
-@ApiModel("Cluster node bind and unbind tag request")
-public class ClusterNodeBindTagRequest {
+@ApiModel("Cluster node bind and unbind stream source label request, stream source label is a filter to judge "
+ + "whether to accept the stream source task")
+public class AgentClusterNodeBindGroupRequest {
- @NotBlank(message = "Cluster nodeTag cannot be blank")
- @ApiModelProperty(value = "Cluster node tag")
- private String clusterNodeTag;
+ @NotBlank(message = "Cluster agent group cannot be blank")
+ @ApiModelProperty(value = "Cluster agent group")
+ private String agentGroup;
@NotBlank(message = "clusterName cannot be blank")
@ApiModelProperty(value = "Cluster name")
private String clusterName;
- @NotBlank(message = "type cannot be blank")
- @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.")
- private String type;
-
@ApiModelProperty(value = "Cluster node ip list which needs to bind tag")
private List<String> bindClusterNodes;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 099ab45e7..f0adcdd5e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -81,10 +81,10 @@ public class SourceRequest {
@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
private String inlongClusterName;
- @ApiModelProperty("Inlong cluster node tag")
+ @ApiModelProperty("Inlong cluster node label for filtering stream source collect task")
@Length(min = 1, max = 128, message = "length must be between 1 and 128")
@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
- private String inlongClusterNodeTag;
+ private String inlongClusterNodeGroup;
@ApiModelProperty("Data node name")
@Length(max = 128, message = "length must be less than or equal to 128")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 8ecccbc8b..fac486088 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -386,15 +385,6 @@ public interface InlongClusterService {
*/
Boolean deleteNode(Integer id, String operator);
- /**
- * Bind or unbind cluster tag node for cluster node.
- *
- * @param request cluster info to be modified
- * @param operator current operator
- * @return whether succeed
- */
- Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator);
-
/**
* Delete cluster node.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a76ec6dcd..598dd10a0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -54,7 +54,6 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -1390,60 +1389,6 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return true;
}
- @Override
- public Boolean bindNodeTag(ClusterNodeBindTagRequest request, String operator) {
- HashSet<String> bindSet = Sets.newHashSet();
- HashSet<String> unbindSet = Sets.newHashSet();
- if (request.getBindClusterNodes() != null) {
- bindSet.addAll(request.getBindClusterNodes());
- }
- if (request.getUnbindClusterNodes() != null) {
- unbindSet.addAll(request.getUnbindClusterNodes());
- }
- Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(),
- "can not add and del node tag in the sameTime");
- InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), request.getType());
- String message = "Current user does not have permission to bind cluster node tag";
- checkUser(cluster, operator, message);
-
- if (CollectionUtils.isNotEmpty(bindSet)) {
- bindSet.stream().flatMap(clusterNode -> {
- ClusterPageRequest pageRequest = new ClusterPageRequest();
- pageRequest.setParentId(cluster.getId());
- pageRequest.setType(request.getType());
- pageRequest.setKeyword(clusterNode);
- return clusterNodeMapper.selectByCondition(pageRequest).stream();
- }).filter(entity -> entity != null)
- .forEach(entity -> {
- String nodeTags = entity.getNodeTags();
- Set<String> tagSet = nodeTags == null ? Sets.newHashSet()
- : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA));
- tagSet.add(request.getClusterNodeTag());
- entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet));
- clusterNodeMapper.updateById(entity);
- });
- }
-
- if (CollectionUtils.isNotEmpty(unbindSet)) {
- unbindSet.stream().flatMap(clusterNode -> {
- ClusterPageRequest pageRequest = new ClusterPageRequest();
- pageRequest.setParentId(cluster.getId());
- pageRequest.setType(request.getType());
- pageRequest.setKeyword(clusterNode);
- return clusterNodeMapper.selectByCondition(pageRequest).stream();
- }).filter(entity -> entity != null)
- .forEach(entity -> {
- String nodeTags = entity.getNodeTags();
- Set<String> tagSet = nodeTags == null ? Sets.newHashSet()
- : Sets.newHashSet(entity.getNodeTags().split(InlongConstants.COMMA));
- tagSet.remove(request.getClusterNodeTag());
- entity.setNodeTags(String.join(InlongConstants.COMMA, tagSet));
- clusterNodeMapper.updateById(entity);
- });
- }
- return true;
- }
-
@Override
public DataProxyNodeResponse getDataProxyNodes(String groupId, String protocolType) {
LOGGER.debug("begin to get data proxy nodes for groupId={}, protocol={}", groupId, protocolType);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index e7db76937..099ebca72 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.core;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
/**
* The service interface for agent
@@ -49,4 +50,11 @@ public interface AgentService {
*/
TaskResult getTaskResult(TaskRequest request);
+ /**
+ * Divide the agent into different groups, which collect different stream source tasks.
+ *
+ * @param request Request of the bind group.
+ * @return Whether succeed.
+ */
+ Boolean bindGroup(AgentClusterNodeBindGroupRequest request);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index f6e1776ce..0efd42c32 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.core.impl;
import com.google.common.collect.Lists;
+import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
@@ -33,6 +34,7 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.AgentConstants;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.consts.SourceType;
@@ -54,6 +56,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
@@ -75,6 +78,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -92,6 +96,7 @@ public class AgentServiceImpl implements AgentService {
private static final int ISSUED_STATUS = 3;
private static final int MODULUS_100 = 100;
private static final int TASK_FETCH_SIZE = 2;
+ private static final Gson GSON = new Gson();
@Autowired
private StreamSourceEntityMapper sourceMapper;
@@ -192,6 +197,66 @@ public class AgentServiceImpl implements AgentService {
return TaskResult.builder().dataConfigs(tasks).cmdConfigs(cmdConfigs).build();
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
+ public Boolean bindGroup(AgentClusterNodeBindGroupRequest request) {
+ HashSet<String> bindSet = Sets.newHashSet();
+ HashSet<String> unbindSet = Sets.newHashSet();
+ if (request.getBindClusterNodes() != null) {
+ bindSet.addAll(request.getBindClusterNodes());
+ }
+ if (request.getUnbindClusterNodes() != null) {
+ unbindSet.addAll(request.getUnbindClusterNodes());
+ }
+ Preconditions.checkTrue(Sets.union(bindSet, unbindSet).size() == bindSet.size() + unbindSet.size(),
+ "can not add and del node tag in the sameTime");
+ InlongClusterEntity cluster = clusterMapper.selectByNameAndType(request.getClusterName(), ClusterType.AGENT);
+ String message = "Current user does not have permission to bind cluster node tag";
+
+ if (CollectionUtils.isNotEmpty(bindSet)) {
+ bindSet.stream().flatMap(clusterNode -> {
+ ClusterPageRequest pageRequest = new ClusterPageRequest();
+ pageRequest.setParentId(cluster.getId());
+ pageRequest.setType(ClusterType.AGENT);
+ pageRequest.setKeyword(clusterNode);
+ return clusterNodeMapper.selectByCondition(pageRequest).stream();
+ }).filter(entity -> entity != null)
+ .forEach(entity -> {
+ Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>()
+ : GSON.fromJson(entity.getExtParams(), Map.class);
+ Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+ : Sets.newHashSet(
+ extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+ groupSet.add(request.getAgentGroup());
+ extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+ entity.setExtParams(GSON.toJson(extParams));
+ clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
+ });
+ }
+
+ if (CollectionUtils.isNotEmpty(unbindSet)) {
+ unbindSet.stream().flatMap(clusterNode -> {
+ ClusterPageRequest pageRequest = new ClusterPageRequest();
+ pageRequest.setParentId(cluster.getId());
+ pageRequest.setType(ClusterType.AGENT);
+ pageRequest.setKeyword(clusterNode);
+ return clusterNodeMapper.selectByCondition(pageRequest).stream();
+ }).filter(entity -> entity != null)
+ .forEach(entity -> {
+ Map<String, String> extParams = entity.getExtParams() == null ? new HashMap<>()
+ : GSON.fromJson(entity.getExtParams(), Map.class);
+ Set<String> groupSet = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+ : Sets.newHashSet(
+ extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+ groupSet.remove(request.getAgentGroup());
+ extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+ entity.setExtParams(GSON.toJson(extParams));
+ clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
+ });
+ }
+ return true;
+ }
+
/**
* Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated)
*
@@ -251,7 +316,7 @@ public class AgentServiceImpl implements AgentService {
private void preProcessFileTask(TaskRequest taskRequest) {
preProcessTemplateFileTask(taskRequest);
- preProcessTagFileTasks(taskRequest);
+ preProcessLabelFileTasks(taskRequest);
}
/**
@@ -306,7 +371,7 @@ public class AgentServiceImpl implements AgentService {
*
* @param taskRequest
*/
- private void preProcessTagFileTasks(TaskRequest taskRequest) {
+ private void preProcessLabelFileTasks(TaskRequest taskRequest) {
List<Integer> needProcessedStatusList = Arrays.asList(
SourceStatus.SOURCE_NORMAL.getCode(),
SourceStatus.SOURCE_FAILED.getCode(),
@@ -329,7 +394,7 @@ public class AgentServiceImpl implements AgentService {
Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
SourceStatus.SOURCE_FROZEN,
SourceStatus.TO_BE_ISSUED_FROZEN);
- if (!matchTag(sourceEntity, clusterNodeEntity)
+ if (!matchLabel(sourceEntity, clusterNodeEntity)
&& !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
+ "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
@@ -348,7 +413,7 @@ public class AgentServiceImpl implements AgentService {
SourceStatus.TO_BE_ISSUED_ACTIVE);
Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
- if (matchTag(sourceEntity, clusterNodeEntity)
+ if (matchLabel(sourceEntity, clusterNodeEntity)
&& !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
&& !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
LOGGER.info("Transform task({}) from {} to {} because tag rematch "
@@ -514,20 +579,21 @@ public class AgentServiceImpl implements AgentService {
}).collect(Collectors.toList());
}
- private boolean matchTag(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) {
+ private boolean matchLabel(StreamSourceEntity sourceEntity, InlongClusterNodeEntity clusterNodeEntity) {
Preconditions.checkNotNull(sourceEntity, "cluster must be valid");
- if (sourceEntity.getInlongClusterNodeTag() == null) {
+ if (sourceEntity.getInlongClusterNodeGroup() == null) {
return true;
}
- if (clusterNodeEntity == null || clusterNodeEntity.getNodeTags() == null) {
+
+ if (clusterNodeEntity == null || clusterNodeEntity.getExtParams() == null) {
return false;
}
- Set<String> nodeTags = Stream.of(
- clusterNodeEntity.getNodeTags().split(InlongConstants.COMMA)).collect(Collectors.toSet());
- Set<String> sourceTags = Stream.of(
- sourceEntity.getInlongClusterNodeTag().split(InlongConstants.COMMA)).collect(Collectors.toSet());
- return sourceTags.stream().anyMatch(sourceTag -> nodeTags.contains(sourceTag));
+ Map<String, String> extParams = GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class);
+ Set<String> clusterNodeLabels = !extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
+ : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+ Set<String> sourceLabels = Stream.of(
+ sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
+ return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel));
}
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 7ccb7db5d..2fb8d5608 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -30,6 +31,7 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.manager.common.consts.AgentConstants;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterStatus;
import org.apache.inlong.manager.common.enums.NodeStatus;
@@ -47,14 +49,21 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Slf4j
@Component
public class HeartbeatManager implements AbstractHeartbeatManager {
private static final String AUTO_REGISTERED = "auto registered";
+ private static final Gson GSON = new Gson();
@Getter
private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
@@ -211,21 +220,30 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
clusterNode.setProtocolType(heartbeat.getProtocolType());
clusterNode.setNodeLoad(heartbeat.getLoad());
- clusterNode.setNodeTags(heartbeat.getNodeTag());
clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
clusterNode.setCreator(creator);
clusterNode.setModifier(creator);
clusterNode.setDescription(AUTO_REGISTERED);
+ insertOrUpdateLabel(clusterNode, heartbeat);
return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
}
private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
clusterNode.setNodeLoad(heartbeat.getLoad());
- clusterNode.setNodeTags(heartbeat.getNodeTag());
+ insertOrUpdateLabel(clusterNode, heartbeat);
return clusterNodeMapper.updateById(clusterNode);
}
+ private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
+ Set<String> groupSet = heartbeat.getNodeGroup() == null ? new HashSet<>()
+ : Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
+ Map<String, String> extParams = clusterNode.getExtParams() == null ? new HashMap<>()
+ : GSON.fromJson(clusterNode.getExtParams(), Map.class);
+ extParams.put(AgentConstants.AGENT_GROUP_KEY, String.join(InlongConstants.COMMA, groupSet));
+ clusterNode.setExtParams(GSON.toJson(extParams));
+ }
+
private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {
return clusterNodeMapper.deleteById(clusterNode.getId());
}
@@ -281,6 +299,6 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
if (oldHB == null) {
return true;
}
- return oldHB.getNodeTag() != newHB.getNodeTag() || oldHB.getLoad() != newHB.getLoad();
+ return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() != newHB.getLoad();
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index f509f934b..2f85d1c9a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -78,7 +78,7 @@ class AgentServiceTest extends ServiceBaseTest {
private InlongStreamEntityMapper streamMapper;
private List<Pair<String, String>> groupStreamCache;
- private List<String> tagCache;
+ private List<String> groupCache;
/**
* Save template source
@@ -110,7 +110,7 @@ class AgentServiceTest extends ServiceBaseTest {
/**
* mock {@link StreamSourceService#save}
*/
- public Pair<String, String> saveSource(String tag) {
+ public Pair<String, String> saveSource(String group) {
String groupId = UUID.randomUUID().toString();
String streamId = UUID.randomUUID().toString();
groupStreamCache.add(new ImmutablePair<>(groupId, streamId));
@@ -121,9 +121,9 @@ class AgentServiceTest extends ServiceBaseTest {
sourceInfo.setInlongStreamId(streamId);
sourceInfo.setSourceType(SourceType.FILE);
sourceInfo.setInlongClusterName(MockAgent.CLUSTER_NAME);
- sourceInfo.setInlongClusterNodeTag(tag);
+ sourceInfo.setInlongClusterNodeGroup(group);
sourceInfo.setSourceName(
- String.format("Source task for cluster(%s) and tag(%s)", MockAgent.CLUSTER_NAME, tag));
+ String.format("Source task for cluster(%s) and group(%s)", MockAgent.CLUSTER_NAME, group));
sourceService.save(sourceInfo, GLOBAL_OPERATOR);
sourceService.updateStatus(
groupId,
@@ -170,7 +170,7 @@ class AgentServiceTest extends ServiceBaseTest {
@BeforeEach
public void setupEach() {
groupStreamCache = new ArrayList<>();
- tagCache = new ArrayList<>();
+ groupCache = new ArrayList<>();
}
@AfterEach
@@ -184,27 +184,27 @@ class AgentServiceTest extends ServiceBaseTest {
groupStreamCache.stream().map(Pair::getValue).collect(Collectors.toList()));
}
groupStreamCache.clear();
- tagCache.stream().forEach(tag -> bindTag(false, tag));;
+ groupCache.stream().forEach(group -> bindGroup(false, group));;
}
- private void bindTag(boolean bind, String tag) {
+ private void bindGroup(boolean bind, String group) {
if (bind) {
- tagCache.add(tag);
+ groupCache.add(group);
}
- agent.bindTag(bind, tag);
+ agent.bindGroup(bind, group);
}
/**
- * Test bind tag for node.
+ * Test bind group for node.
*/
@Test
- public void testTagMatch() {
- saveSource("tag1,tag3");
- saveSource("tag2,tag3");
- saveSource("tag2,tag3");
- saveSource("tag4");
- bindTag(true, "tag1");
- bindTag(true, "tag2");
+ public void testGroupMatch() {
+ saveSource("group1,group3");
+ saveSource("group2,group3");
+ saveSource("group2,group3");
+ saveSource("group4");
+ bindGroup(true, "group1");
+ bindGroup(true, "group2");
TaskResult taskResult = agent.pullTask();
Assertions.assertTrue(taskResult.getCmdConfigs().isEmpty());
@@ -220,12 +220,12 @@ class AgentServiceTest extends ServiceBaseTest {
}
/**
- * Test node tag mismatch source task and next time rematch source task.
+ * Test node group mismatch source task and next time rematch source task.
*/
@Test
- public void testTagMismatchAndRematch() {
- final Pair<String, String> groupStream = saveSource("tag1,tag3");
- bindTag(true, "tag1");
+ public void testGroupMismatchAndRematch() {
+ final Pair<String, String> groupStream = saveSource("group1,group3");
+ bindGroup(true, "group1");
agent.pullTask();
agent.pullTask(); // report last success status
@@ -235,8 +235,8 @@ class AgentServiceTest extends ServiceBaseTest {
.findAny()
.get()
.getId();
- // unbind tag and mismatch
- bindTag(false, "tag1");
+ // unbind group and mismatch
+ bindGroup(false, "group1");
TaskResult t1 = agent.pullTask();
Assertions.assertEquals(1, t1.getDataConfigs().size());
Assertions.assertEquals(1, t1.getDataConfigs().stream()
@@ -246,8 +246,8 @@ class AgentServiceTest extends ServiceBaseTest {
DataConfig d1 = t1.getDataConfigs().get(0);
Assertions.assertEquals(sourceId, d1.getTaskId());
- // bind tag and rematch
- bindTag(true, "tag1");
+ // bind group and rematch
+ bindGroup(true, "group1");
TaskResult t2 = agent.pullTask();
Assertions.assertEquals(1, t2.getDataConfigs().size());
Assertions.assertEquals(1, t2.getDataConfigs().stream()
@@ -263,14 +263,14 @@ class AgentServiceTest extends ServiceBaseTest {
*/
@Test
public void testSuspendFailWhenNotAck() {
- Pair<String, String> groupStream = saveSource("tag1,tag3");
- bindTag(true, "tag1");
+ Pair<String, String> groupStream = saveSource("group1,group3");
+ bindGroup(true, "group1");
agent.pullTask();
agent.pullTask(); // report last success status
// mismatch
- bindTag(false, "tag1");
+ bindGroup(false, "group1");
agent.pullTask();
// suspend
@@ -282,21 +282,21 @@ class AgentServiceTest extends ServiceBaseTest {
}
/**
- * Test node tag rematch source task but group suspend
+ * Test node group rematch source task but group suspend
*/
@Test
public void testRematchedWhenSuspend() {
- final Pair<String, String> groupStream = saveSource("tag1,tag3");
- bindTag(true, "tag1");
+ final Pair<String, String> groupStream = saveSource("group1,group3");
+ bindGroup(true, "group1");
agent.pullTask();
agent.pullTask(); // report last success status
// mismatch and rematch
- bindTag(false, "tag1");
+ bindGroup(false, "group1");
agent.pullTask();
agent.pullTask(); // report last to make it from 304 -> 104
- bindTag(true, "tag1");
+ bindGroup(true, "group1");
// suspend
suspendSource(groupStream.getLeft(), groupStream.getRight());
@@ -305,12 +305,12 @@ class AgentServiceTest extends ServiceBaseTest {
}
/**
- * Test node tag mismatch source task but group restart
+ * Test node group mismatch source task but group restart
*/
@Test
public void testMismatchedWhenRestart() {
- final Pair<String, String> groupStream = saveSource("tag1,tag3");
- bindTag(true, "tag1");
+ final Pair<String, String> groupStream = saveSource("group1,group3");
+ bindGroup(true, "group1");
agent.pullTask();
agent.pullTask(); // report last success status
@@ -318,7 +318,7 @@ class AgentServiceTest extends ServiceBaseTest {
// suspend and restart
suspendSource(groupStream.getLeft(), groupStream.getRight());
restartSource(groupStream.getLeft(), groupStream.getRight());
- bindTag(false, "tag1");
+ bindGroup(false, "group1");
TaskResult taskResult = agent.pullTask();
Assertions.assertEquals(1, taskResult.getDataConfigs().size());
Assertions.assertEquals(1, taskResult.getDataConfigs().stream()
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
index 2acb651f5..9e0061d18 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mocks/MockAgent.java
@@ -44,7 +44,7 @@ public class MockAgent {
// 2. Regularly report the previously executed tasks to the manager (may be successful or fail)
public static final String LOCAL_IP = "127.0.0.1";
public static final String LOCAL_PORT = "8008";
- public static final String LOCAL_TAG = "default_tag";
+ public static final String LOCAL_GROUP = "default_group";
public static final String CLUSTER_TAG = "default_cluster_tag";
public static final String CLUSTER_NAME = "1c59ef9e8e1e43cfb25ee8b744c9c81b_2790";
@@ -52,7 +52,7 @@ public class MockAgent {
private HeartbeatService heartbeatService;
private Queue<CommandEntity> commands = new LinkedList<>();
- private Set<String> tags = Sets.newHashSet(LOCAL_TAG);
+ private Set<String> groups = Sets.newHashSet(LOCAL_GROUP);
private int jobLimit;
public MockAgent(AgentService agentService, HeartbeatService heartbeatService, int jobLimit) {
@@ -83,17 +83,17 @@ public class MockAgent {
heartbeat.setComponentType(ComponentTypeEnum.Agent.getType());
heartbeat.setClusterName(CLUSTER_NAME);
heartbeat.setClusterTag(CLUSTER_TAG);
- heartbeat.setNodeTag(tags.stream().collect(Collectors.joining(InlongConstants.COMMA)));
+ heartbeat.setNodeGroup(groups.stream().collect(Collectors.joining(InlongConstants.COMMA)));
heartbeat.setInCharges(GLOBAL_OPERATOR);
heartbeat.setReportTime(System.currentTimeMillis());
heartbeatService.reportHeartbeat(heartbeat);
}
- public void bindTag(boolean bind, String tag) {
+ public void bindGroup(boolean bind, String group) {
if (bind) {
- tags.add(tag);
+ groups.add(group);
} else {
- tags.remove(tag);
+ groups.remove(group);
}
sendHeartbeat();
}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 6af822e01..fc4938469 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -133,7 +133,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
`port` int(6) NULL COMMENT 'Cluster port',
`protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
`node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node',
- `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
`description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
`status` int(4) DEFAULT '0' COMMENT 'Cluster status',
@@ -330,7 +329,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
- `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag',
+ `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group',
`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
`snapshot` mediumtext DEFAULT NULL COMMENT 'Snapshot of this source task',
`report_time` timestamp NULL COMMENT 'Snapshot time',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f8bcb47fb..3036a7525 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -143,7 +143,6 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
`port` int(6) NULL COMMENT 'Cluster port',
`protocol_type` varchar(20) DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
`node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node',
- `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
`description` varchar(256) DEFAULT '' COMMENT 'Description of cluster node',
`status` int(4) DEFAULT '0' COMMENT 'Cluster status',
@@ -347,7 +346,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task',
- `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag',
+ `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group',
`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
`snapshot` mediumtext DEFAULT NULL COMMENT 'Snapshot of this source task',
`report_time` timestamp NULL COMMENT 'Snapshot time',
diff --git a/inlong-manager/manager-web/sql/changes-1.5.0.sql b/inlong-manager/manager-web/sql/changes-1.5.0.sql
index 4b655bc2c..88ed142bd 100644
--- a/inlong-manager/manager-web/sql/changes-1.5.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.5.0.sql
@@ -32,9 +32,5 @@ ALTER TABLE `inlong_cluster_node`
ADD COLUMN `node_load` int(11) DEFAULT '-1' COMMENT 'Current load value of the node';
-ALTER TABLE `inlong_cluster_node`
- ADD COLUMN `node_tags` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag, separated by commas, only uniquely identified by parent_id and ip';
-
-
ALTER TABLE `stream_source`
- ADD COLUMN `inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag';
\ No newline at end of file
+ ADD COLUMN `inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node group';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index 3ea4634d3..df49e13ea 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -25,7 +25,6 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.cluster.BindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.pojo.cluster.ClusterNodeBindTagRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
@@ -228,14 +227,6 @@ public class InlongClusterController {
return Response.success(clusterService.deleteNode(id, LoginUserUtils.getLoginUser().getName()));
}
- @RequestMapping(value = "/cluster/node/bindTag")
- @OperationLog(operation = OperationType.UPDATE)
- @ApiOperation(value = "Bind or unbind cluster node tag")
- public Response<Boolean> bindNodeTag(@Validated @RequestBody ClusterNodeBindTagRequest request) {
- String username = LoginUserUtils.getLoginUser().getName();
- return Response.success(clusterService.bindNodeTag(request, username));
- }
-
@PostMapping("/cluster/testConnection")
@ApiOperation(value = "Test connection for inlong cluster")
public Response<Boolean> testConnection(@RequestBody ClusterRequest request) {
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 88343f940..5e2270637 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiOperation;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.AgentService;
@@ -68,4 +69,9 @@ public class AgentController {
return Response.success(agentService.getTaskResult(request));
}
+ @PostMapping("/agent/bindGroup")
+ @ApiOperation(value = "Divide the agent into different groups, which collect different stream source tasks.")
+ public Response<Boolean> bindGroup(@RequestBody AgentClusterNodeBindGroupRequest request) {
+ return Response.success(agentService.bindGroup(request));
+ }
}