You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/30 02:07:50 UTC

[inlong] 03/04: [INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 2afe325afb48a2244f1a2b252f61b4acdc83dcb5
Author: healchow <he...@gmail.com>
AuthorDate: Mon Aug 29 19:27:57 2022 +0800

    [INLONG-5729][Manager] Fix the failure of suspending or restarting the sources (#5730)
---
 .../manager/client/api/impl/InlongGroupImpl.java   | 51 ++++++++--------------
 .../inlong/manager/pojo/group/InlongGroupInfo.java |  4 +-
 .../manager/pojo/group/InlongGroupRequest.java     |  4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  2 +-
 .../manager-web/sql/apache_inlong_manager.sql      | 18 ++++----
 5 files changed, 31 insertions(+), 48 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 403ff81b2..348dcf3ea 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
@@ -34,11 +33,11 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
+import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf;
@@ -147,41 +146,29 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkTrue(groupId != null && groupId.equals(this.groupInfo.getInlongGroupId()),
                 "groupId must be same");
 
-        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
-        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
-        SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
-        Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
-                "inlong group is in init status, should not be updated");
-
         InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
-        groupInfo.setVersion(existGroupInfo.getVersion());
-        InlongGroupRequest groupRequest = groupInfo.genRequest();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
-        String errMsg = idAndErr.getValue();
-        Preconditions.checkNull(errMsg, errMsg);
-
-        this.groupContext.setGroupInfo(groupInfo);
-        this.groupInfo = groupInfo;
+        this.updateOpt(groupInfo);
+        this.groupInfo = this.groupContext.getGroupInfo();
     }
 
     @Override
     public void update(BaseSortConf sortConf) throws Exception {
         Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
+        this.updateOpt(InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf));
+    }
 
-        final String groupId = this.groupInfo.getInlongGroupId();
-
-        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
-        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exists, cannot update");
+    private void updateOpt(InlongGroupInfo groupInfo) {
+        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupInfo.getInlongGroupId());
+        Preconditions.checkNotNull(existGroupInfo, "inlong group does not exist, cannot be updated");
         SimpleGroupStatus status = SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
         Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
-                "inlong group is in init status, should not be updated");
+                "inlong group is in init status, cannot be updated");
 
-        InlongGroupInfo groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, sortConf);
         groupInfo.setVersion(existGroupInfo.getVersion());
-        InlongGroupRequest groupRequest = groupInfo.genRequest();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
+        Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
         String errMsg = idAndErr.getValue();
         Preconditions.checkNull(errMsg, errMsg);
+
         this.groupContext.setGroupInfo(groupInfo);
     }
 
@@ -207,11 +194,9 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext suspend(boolean async) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
-        final String errMsg = idAndErr.getValue();
-        final String groupId = idAndErr.getKey();
-        Preconditions.checkNull(errMsg, errMsg);
-        groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED, async);
+        this.updateOpt(groupInfo);
+
+        groupClient.operateInlongGroup(groupInfo.getInlongGroupId(), SimpleGroupStatus.STOPPED, async);
         return generateSnapshot();
     }
 
@@ -223,11 +208,9 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext restart(boolean async) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        Pair<String, String> idAndErr = groupClient.updateGroup(groupInfo.genRequest());
-        final String errMsg = idAndErr.getValue();
-        final String groupId = idAndErr.getKey();
-        Preconditions.checkNull(errMsg, errMsg);
-        groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED, async);
+        this.updateOpt(groupInfo);
+
+        groupClient.operateInlongGroup(groupInfo.getInlongGroupId(), SimpleGroupStatus.STARTED, async);
         return generateSnapshot();
     }
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index 5f3b22754..0b8e93656 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -71,10 +71,10 @@ public abstract class InlongGroupInfo {
     @Builder.Default
     private Integer enableZookeeper = 0;
 
-    @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+    @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
     private Integer enableCreateResource;
 
-    @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
+    @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
     private Integer lightweight;
 
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index f687e43d9..3c07a9c79 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -70,10 +70,10 @@ public abstract class InlongGroupRequest {
     @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
     private Integer enableZookeeper = 0;
 
-    @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+    @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
     private Integer enableCreateResource = 1;
 
-    @ApiModelProperty(value = "Whether to use lightweight mode, 0: false, 1: true")
+    @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
     private Integer lightweight = 0;
 
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 99a22413c..b5e12851a 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `max_length`             int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `enable_zookeeper`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
-    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
+    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
     `inlong_cluster_tag`     varchar(128)          DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
     `ext_params`             text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e7bbf585b..25aae58ca 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS `inlong_group`
     `max_length`             int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `enable_zookeeper`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to enable the zookeeper, 0-disable, 1-enable',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create resource? 0-disable, 1-enable',
-    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-false, 1-true',
+    `lightweight`            tinyint(1)            DEFAULT '0' COMMENT 'Whether to use lightweight mode, 0-no, 1-yes',
     `inlong_cluster_tag`     varchar(128)          DEFAULT NULL COMMENT 'The cluster tag, which links to inlong_cluster table',
     `ext_params`             text                  DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string, such as queue_module, partition_num',
     `in_charges`             varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
@@ -402,7 +402,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
     `inlong_stream_id`    varchar(256) NOT NULL COMMENT 'Inlong stream id',
     `source_name`         varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
     `source_type`         varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
-    `template_id`         int(11)      DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
+    `template_id`         int(11)               DEFAULT NULL COMMENT 'Id of the template task this agent belongs to',
     `agent_ip`            varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task',
     `uuid`                varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
     `data_node_name`      varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
@@ -576,7 +576,7 @@ CREATE TABLE IF NOT EXISTS `user`
     `encrypt_version` int(11)               DEFAULT NULL COMMENT 'Encryption key version',
     `account_type`    int(11)      NOT NULL DEFAULT '1' COMMENT 'Account type, 0-manager 1-normal',
     `due_date`        datetime              DEFAULT NULL COMMENT 'Due date for user',
-    `ext_params`      text         COMMENT 'Json extension info',
+    `ext_params`      text COMMENT 'Json extension info',
     `status`          int(11)               DEFAULT '100' COMMENT 'Status',
     `is_deleted`      int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
     `creator`         varchar(256) NOT NULL COMMENT 'Creator name',
@@ -794,8 +794,8 @@ CREATE TABLE IF NOT EXISTS `component_heartbeat`
     `id`               int(11)     NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `component`        varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
     `instance`         varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
-    `status_heartbeat` text        DEFAULT NULL COMMENT 'Status heartbeat info',
-    `metric_heartbeat` text        DEFAULT NULL COMMENT 'Metric heartbeat info',
+    `status_heartbeat` text                 DEFAULT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text                 DEFAULT NULL COMMENT 'Metric heartbeat info',
     `report_time`      bigint(20)  NOT NULL COMMENT 'Report time',
     `create_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
@@ -813,8 +813,8 @@ CREATE TABLE IF NOT EXISTS `group_heartbeat`
     `component`        varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
     `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
     `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
-    `status_heartbeat` text         DEFAULT NULL COMMENT 'Status heartbeat info',
-    `metric_heartbeat` text         DEFAULT NULL COMMENT 'Metric heartbeat info',
+    `status_heartbeat` text                  DEFAULT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text                  DEFAULT NULL COMMENT 'Metric heartbeat info',
     `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
@@ -833,8 +833,8 @@ CREATE TABLE IF NOT EXISTS `stream_heartbeat`
     `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
     `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
-    `status_heartbeat` text         DEFAULT NULL COMMENT 'Status heartbeat info',
-    `metric_heartbeat` text         DEFAULT NULL COMMENT 'Metric heartbeat info',
+    `status_heartbeat` text                  DEFAULT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text                  DEFAULT NULL COMMENT 'Metric heartbeat info',
     `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',