You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/30 02:07:50 UTC
[inlong] 03/04: [INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 2afe325afb48a2244f1a2b252f61b4acdc83dcb5
Author: healchow <he...@gmail.com>
AuthorDate: Mon Aug 29 19:27:57 2022 +0800
[INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)
---
.../manager/client/api/impl/InlongGroupImpl.java | 51 ++++++++--------------
.../inlong/manager/pojo/group/InlongGroupInfo.java | 4 +-
.../manager/pojo/group/InlongGroupRequest.java | 4 +-
.../main/resources/h2/apache_inlong_manager.sql | 2 +-
.../manager-web/sql/apache_inlong_manager.sql | 18 ++++----
5 files changed, 31 insertions(+), 48 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 403ff81b2..348dcf3ea 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
@@ -34,11 +33,11 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
@@ -147,41 +146,29 @@ public class InlongGroupImpl implements InlongGroup {
Preconditions.checkTrue(groupId != null && groupId.equals(this.groupInfo.getInlongGroupId()),
"groupId must be same");
- InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
- Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
- SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
- Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
- "inlong group is in init status, should not be updated");
-
InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
- groupInfo.setVersion(existGroupInfo.getVersion());
- InlongGroupRequest groupRequest = groupInfo.genRequest();
- Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
- String errMsg = idAndErr.getValue();
- Preconditions.checkNull(errMsg, errMsg);
-
- this.groupContext.setGroupInfo(groupInfo);
- this.groupInfo = groupInfo;
+ this.updateOpt(groupInfo);
+ this.groupInfo = this.groupContext.getGroupInfo();
}
@Override
public void update(BaseSortConf sortConf) throws Exception {
Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
+ this.updateOpt(InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf));
+ }
- final String groupId = this.groupInfo.getInlongGroupId();
-
- InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
- Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
+ private void updateOpt(InlongGroupInfo groupInfo) {
+ InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupInfo.getInlongGroupId());
+ Preconditions.checkNotNull(existGroupInfo, "inlong group does not exist, cannot be updated");
SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
- "inlong group is in init status, should not be updated");
+ "inlong group is in init status, cannot be updated");
- InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
groupInfo.setVersion(existGroupInfo.getVersion());
- InlongGroupRequest groupRequest = groupInfo.genRequest();
- Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
+ Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
String errMsg = idAndErr.getValue();
Preconditions.checkNull(errMsg, errMsg);
+
this.groupContext.setGroupInfo(groupInfo);
}
@@ -207,11 +194,9 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext suspend(boolean async) {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
- Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
- final String errMsg = idAndErr.getValue();
- final String groupId = idAndErr.getKey();
- Preconditions.checkNull(errMsg, errMsg);
- groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED, async);
+ this.updateOpt(groupInfo);
+
+ groupClient.operateInlongGroup(groupInfo.getInlongGroupId(), SimpleGroupStatus.STOPPED, async);
return generateSnapshot();
}
@@ -223,11 +208,9 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext restart(boolean async) {
InlongGroupInfo groupInfo = groupContext.getGroupInfo();
- Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
- final String errMsg = idAndErr.getValue();
- final String groupId = idAndErr.getKey();
- Preconditions.checkNull(errMsg, errMsg);
- groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED, async);
+ this.updateOpt(groupInfo);
+
+ groupClient.operateInlongGroup(groupInfo.getInlongGroupId(), SimpleGroupStatus.STARTED, async);
return generateSnapshot();
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index 5f3b22754..0b8e93656 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -71,10 +71,10 @@ public abstract class InlongGroupInfo {
@Builder.Default
private Integer enableZookeeper = 0;
- @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+ @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
private Integer enableCreateResource;
- @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
+ @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
private Integer lightweight;
@ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index f687e43d9..3c07a9c79 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -70,10 +70,10 @@ public abstract class InlongGroupRequest {
@ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
private Integer enableZookeeper = 0;
- @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+ @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
private Integer enableCreateResource = 1;
- @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
+ @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
private Integer lightweight = 0;
@ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 99a22413c..b5e12851a 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
`enable_zookeeper` tinyint(1) DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
- `lightweight` tinyint(1) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
+ `lightweight` tinyint(1) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
`ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e7bbf585b..25aae58ca 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
`max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
`enable_zookeeper` tinyint(1) DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
- `lightweight` tinyint(1) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
+ `lightweight` tinyint(1) DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
`inlong_cluster_tag` varchar(128) DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
`ext_params` text DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
@@ -402,7 +402,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
- `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
+ `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
@@ -576,7 +576,7 @@ CREATE TABLE IF NOT EXISTS `user`
`encrypt_version` int(11) DEFAULT NULL COMMENT 'Encryption key version',
`account_type` int(11) NOT NULL DEFAULT '1' COMMENT 'Account type, 0-manager 1-normal',
`due_date` datetime DEFAULT NULL COMMENT 'Due date for user',
- `ext_params` text COMMENT 'Json extension info',
+ `ext_params` text COMMENT 'Json extension info',
`status` int(11) DEFAULT '100' COMMENT 'Status',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
`creator` varchar(256) NOT NULL COMMENT 'Creator name',
@@ -794,8 +794,8 @@ CREATE TABLE IF NOT EXISTS `component_heartbeat`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
`instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
- `status_heartbeat` text DEFAULT NULL COMMENT 'Status heartbeat info',
- `metric_heartbeat` text DEFAULT NULL COMMENT 'Metric heartbeat info',
+ `status_heartbeat` text DEFAULT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text DEFAULT NULL COMMENT 'Metric heartbeat info',
`report_time` bigint(20) NOT NULL COMMENT 'Report time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
@@ -813,8 +813,8 @@ CREATE TABLE IF NOT EXISTS `group_heartbeat`
`component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
`instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
`inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
- `status_heartbeat` text DEFAULT NULL COMMENT 'Status heartbeat info',
- `metric_heartbeat` text DEFAULT NULL COMMENT 'Metric heartbeat info',
+ `status_heartbeat` text DEFAULT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text DEFAULT NULL COMMENT 'Metric heartbeat info',
`report_time` bigint(20) NOT NULL COMMENT 'Report time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
@@ -833,8 +833,8 @@ CREATE TABLE IF NOT EXISTS `stream_heartbeat`
`instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
`inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
`inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
- `status_heartbeat` text DEFAULT NULL COMMENT 'Status heartbeat info',
- `metric_heartbeat` text DEFAULT NULL COMMENT 'Metric heartbeat info',
+ `status_heartbeat` text DEFAULT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text DEFAULT NULL COMMENT 'Metric heartbeat info',
`report_time` bigint(20) NOT NULL COMMENT 'Report time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',