You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/05 08:02:05 UTC

[inlong] branch branch-1.5 updated (c7af0fcc0 -> 3db90d470)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from c7af0fcc0 [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)
     new a4f707b60 [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150)
     new 871027aa4 [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
     new d2fe1614c [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
     new 903183c6e [INLONG-7144][Manager] Add interface field limit (#7147)
     new 9e6782663 [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145)
     new d4b128d63 [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155)
     new 3db90d470 [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-dashboard/src/locales/cn.json               |   5 +-
 inlong-dashboard/src/locales/en.json               |   5 +-
 .../src/metas/consumes/common/status.tsx           |   8 +-
 .../src/pages/Clusters/CreateModal.tsx             |  29 ++++-
 inlong-dashboard/src/pages/Nodes/DetailModal.tsx   |  30 ++++-
 .../manager/pojo/cluster/BindTagRequest.java       |   6 +
 .../manager/pojo/cluster/ClusterNodeRequest.java   |   7 ++
 .../manager/pojo/cluster/ClusterRequest.java       |  13 ++
 .../manager/pojo/cluster/ClusterTagRequest.java    |   7 ++
 .../manager/pojo/group/InlongGroupExtInfo.java     |   8 ++
 .../manager/pojo/group/InlongGroupRequest.java     |  12 +-
 .../pojo/group/InlongGroupResetRequest.java        |   5 +-
 .../inlong/manager/pojo/node/DataNodeRequest.java  |  11 ++
 .../inlong/manager/pojo/sink/SinkRequest.java      |  16 ++-
 .../manager/pojo/sort/util/LoadNodeUtils.java      |   2 +-
 .../inlong/manager/pojo/source/SourceRequest.java  |  14 +++
 .../manager/pojo/stream/InlongStreamRequest.java   |  14 ++-
 .../manager/pojo/transform/TransformRequest.java   |   8 ++
 .../inlong/manager/pojo/user/UserRequest.java      |  10 ++
 .../service/sink/mysql/MySQLSinkOperator.java      |  10 ++
 .../service/source/kafka/KafkaSourceOperator.java  |   6 +-
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++++++++++----------
 .../protocol/node/extract/KafkaExtractNode.java    |  53 ++++++--
 .../node/extract/KafkaExtractNodeTest.java         |  22 ++++
 25 files changed, 347 insertions(+), 94 deletions(-)


[inlong] 06/07: [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d4b128d63304b5e85263ff07833f2cf1473c537f
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Thu Jan 5 14:22:29 2023 +0800

    [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155)
---
 inlong-dashboard/src/metas/consumes/common/status.tsx | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/inlong-dashboard/src/metas/consumes/common/status.tsx b/inlong-dashboard/src/metas/consumes/common/status.tsx
index 83efe863e..ca7e21e48 100644
--- a/inlong-dashboard/src/metas/consumes/common/status.tsx
+++ b/inlong-dashboard/src/metas/consumes/common/status.tsx
@@ -38,22 +38,22 @@ export const statusList: StatusProp[] = [
   },
   {
     label: i18n.t('pages.Approvals.status.Processing'),
-    value: 11,
+    value: 101,
     type: 'warning',
   },
   {
     label: i18n.t('pages.Approvals.status.Rejected'),
-    value: 20,
+    value: 102,
     type: 'error',
   },
   {
     label: i18n.t('pages.Approvals.status.Ok'),
-    value: 21,
+    value: 103,
     type: 'success',
   },
   {
     label: i18n.t('pages.Approvals.status.Canceled'),
-    value: 22,
+    value: 104,
     type: 'error',
   },
 ];


[inlong] 05/07: [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9e678266313b3025bfd5416b6adb413a4754a548
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Thu Jan 5 14:21:15 2023 +0800

    [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145)
---
 inlong-dashboard/src/locales/cn.json               |  5 +++-
 inlong-dashboard/src/locales/en.json               |  5 +++-
 .../src/pages/Clusters/CreateModal.tsx             | 29 +++++++++++++++++++--
 inlong-dashboard/src/pages/Nodes/DetailModal.tsx   | 30 ++++++++++++++++++++--
 4 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 31a32a43f..bbb003abe 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -3,6 +3,7 @@
   "basic.Detail": "详情",
   "basic.Operating": "操作",
   "basic.OperatingSuccess": "操作成功",
+  "basic.ConnectionSuccess": "连接成功",
   "basic.Save": "保存",
   "basic.Cancel": "取消",
   "basic.Create": "新建",
@@ -587,6 +588,7 @@
   "pages.Clusters.Tag": "集群标签",
   "pages.Clusters.InCharges": "责任人",
   "pages.Clusters.Description": "集群描述",
+  "pages.Clusters.TestConnection": "测试连接",
   "pages.Clusters.Node.Name": "节点",
   "pages.Clusters.Node.Port": "端口",
   "pages.Clusters.Node.ProtocolType": "协议类型",
@@ -636,5 +638,6 @@
   "pages.ApprovalManagement.Approvers": "审批者",
   "pages.ApprovalManagement.Creator": "创建人",
   "pages.ApprovalManagement.Modifier": "修改人",
-  "pages.ApprovalManagement.CreateProcess": "新建流程"
+  "pages.ApprovalManagement.CreateProcess": "新建流程",
+  "pages.Nodes.TestConnection": "测试连接"
 }
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 20eac2c6d..1b8551a20 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -3,6 +3,7 @@
   "basic.Detail": "Detail",
   "basic.Operating": "Operate",
   "basic.OperatingSuccess": "Operating Success",
+  "basic.ConnectionSuccess": "Connection Success",
   "basic.Save": "Save",
   "basic.Cancel": "Cancel",
   "basic.Create": "Create",
@@ -587,6 +588,7 @@
   "pages.Clusters.Tag": "Cluster Tag",
   "pages.Clusters.InCharges": "Owners",
   "pages.Clusters.Description": "Description",
+  "pages.Clusters.TestConnection": "Test Connection",
   "pages.Clusters.Node.Name": "Node",
   "pages.Clusters.Node.Port": "Port",
   "pages.Clusters.Node.ProtocolType": "Protocol Type",
@@ -636,5 +638,6 @@
   "pages.ApprovalManagement.Approvers": "Approvers",
   "pages.ApprovalManagement.Creator": "Creator Name",
   "pages.ApprovalManagement.Modifier": "Modifier Name",
-  "pages.ApprovalManagement.CreateProcess": "Create Process"
+  "pages.ApprovalManagement.CreateProcess": "Create Process",
+  "pages.Nodes.TestConnection": "Test Connection"
 }
diff --git a/inlong-dashboard/src/pages/Clusters/CreateModal.tsx b/inlong-dashboard/src/pages/Clusters/CreateModal.tsx
index 1c442ae82..252c99445 100644
--- a/inlong-dashboard/src/pages/Clusters/CreateModal.tsx
+++ b/inlong-dashboard/src/pages/Clusters/CreateModal.tsx
@@ -18,7 +18,7 @@
  */
 
 import React, { useState, useMemo } from 'react';
-import { Modal, message } from 'antd';
+import { Modal, message, Button } from 'antd';
 import { ModalProps } from 'antd/es/modal';
 import FormGenerator, { useForm } from '@/components/FormGenerator';
 import { useRequest, useUpdateEffect } from '@/hooks';
@@ -78,6 +78,21 @@ const Comp: React.FC<Props> = ({ id, defaultType, ...modalProps }) => {
     message.success(i18n.t('basic.OperatingSuccess'));
   };
 
+  const testConnection = async () => {
+    const values = await form.validateFields();
+    const submitData = {
+      ...values,
+      inCharges: values.inCharges?.join(','),
+      clusterTags: values.clusterTags?.join(','),
+    };
+    await request({
+      url: '/cluster/testConnection',
+      method: 'POST',
+      data: submitData,
+    });
+    message.success(i18n.t('basic.ConnectionSuccess'));
+  };
+
   useUpdateEffect(() => {
     if (modalProps.visible) {
       if (id) {
@@ -101,7 +116,17 @@ const Comp: React.FC<Props> = ({ id, defaultType, ...modalProps }) => {
     <Modal
       {...modalProps}
       title={id ? i18n.t('pages.Clusters.Edit') : i18n.t('pages.Clusters.Create')}
-      onOk={onOk}
+      footer={[
+        <Button key="cancel" onClick={modalProps.onCancel}>
+          {i18n.t('basic.Cancel')}
+        </Button>,
+        <Button key="save" type="primary" onClick={onOk}>
+          {i18n.t('basic.Save')}
+        </Button>,
+        <Button key="run" type="primary" onClick={testConnection}>
+          {i18n.t('pages.Clusters.TestConnection')}
+        </Button>,
+      ]}
     >
       <FormGenerator
         content={content}
diff --git a/inlong-dashboard/src/pages/Nodes/DetailModal.tsx b/inlong-dashboard/src/pages/Nodes/DetailModal.tsx
index 621157118..4afdc4371 100644
--- a/inlong-dashboard/src/pages/Nodes/DetailModal.tsx
+++ b/inlong-dashboard/src/pages/Nodes/DetailModal.tsx
@@ -18,13 +18,14 @@
  */
 
 import React, { useState, useMemo } from 'react';
-import { Modal, message } from 'antd';
+import { Modal, message, Button } from 'antd';
 import { ModalProps } from 'antd/es/modal';
 import FormGenerator, { useForm } from '@/components/FormGenerator';
 import { useUpdateEffect } from '@/hooks';
 import { dao } from '@/metas/nodes';
 import { useDefaultMeta, useLoadMeta, NodeMetaType } from '@/metas';
 import i18n from '@/i18n';
+import request from '@/utils/request';
 
 const { useFindNodeDao, useSaveNodeDao } = dao;
 
@@ -63,6 +64,21 @@ const Comp: React.FC<Props> = ({ id, defaultType, ...modalProps }) => {
     message.success(i18n.t('basic.OperatingSuccess'));
   };
 
+  const testConnection = async () => {
+    const values = await form.validateFields();
+    const submitData = {
+      ...values,
+      inCharges: values.inCharges?.join(','),
+      clusterTags: values.clusterTags?.join(','),
+    };
+    await request({
+      url: '/node/testConnection',
+      method: 'POST',
+      data: submitData,
+    });
+    message.success(i18n.t('basic.ConnectionSuccess'));
+  };
+
   useUpdateEffect(() => {
     if (modalProps.visible) {
       // open
@@ -88,7 +104,17 @@ const Comp: React.FC<Props> = ({ id, defaultType, ...modalProps }) => {
       {...modalProps}
       width={720}
       title={id ? i18n.t('basic.Detail') : i18n.t('basic.Create')}
-      onOk={onOk}
+      footer={[
+        <Button key="cancel" onClick={modalProps.onCancel}>
+          {i18n.t('basic.Cancel')}
+        </Button>,
+        <Button key="save" type="primary" onClick={onOk}>
+          {i18n.t('basic.Save')}
+        </Button>,
+        <Button key="run" type="primary" onClick={testConnection}>
+          {i18n.t('pages.Nodes.TestConnection')}
+        </Button>,
+      ]}
     >
       <FormGenerator
         content={content}


[inlong] 01/07: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit a4f707b60d1bfcf1410cb74e2da0bbcdc4a9ded0
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Jan 4 21:20:33 2023 +0800

    [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150)
---
 .../java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index d8f47eda3..1eb3e1398 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -289,7 +289,7 @@ public class LoadNodeUtils {
                 null,
                 null,
                 properties,
-                ckSink.getTableName(),
+                ckSink.getDbName() + "." + ckSink.getTableName(),
                 ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
                 ckSink.getUsername(),
                 ckSink.getPassword(),


[inlong] 02/07: [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 871027aa47d7ccde99a326266da81b73d310bef0
Author: haifxu <xh...@gmail.com>
AuthorDate: Thu Jan 5 09:49:52 2023 +0800

    [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
---
 .../inlong/manager/service/sink/mysql/MySQLSinkOperator.java   | 10 ++++++++++
 .../manager/service/source/kafka/KafkaSourceOperator.java      |  6 +++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index cafa79125..a81b9ff34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.manager.service.sink.mysql;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
         }
 
         MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            String dataNodeName = entity.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty");
+            DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            dto.setPassword(dataNodeInfo.getToken());
+        }
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 6b6dff0cb..686b81c39 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -118,7 +119,10 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
                 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
                     continue;
                 }
-                kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+                if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty(
+                        sourceInfo.getSerializationType())) {
+                    kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+                }
             }
 
             kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());


[inlong] 04/07: [INLONG-7144][Manager] Add interface field limit (#7147)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 903183c6efa578698aca5441b41c79816aaaba0d
Author: Goson Zhang <46...@qq.com>
AuthorDate: Thu Jan 5 11:03:59 2023 +0800

    [INLONG-7144][Manager] Add interface field limit (#7147)
---
 .../inlong/manager/pojo/cluster/BindTagRequest.java      |  6 ++++++
 .../inlong/manager/pojo/cluster/ClusterNodeRequest.java  |  7 +++++++
 .../inlong/manager/pojo/cluster/ClusterRequest.java      | 13 +++++++++++++
 .../inlong/manager/pojo/cluster/ClusterTagRequest.java   |  7 +++++++
 .../inlong/manager/pojo/group/InlongGroupExtInfo.java    |  8 ++++++++
 .../inlong/manager/pojo/group/InlongGroupRequest.java    | 12 +++++++++++-
 .../manager/pojo/group/InlongGroupResetRequest.java      |  5 ++---
 .../apache/inlong/manager/pojo/node/DataNodeRequest.java | 11 +++++++++++
 .../org/apache/inlong/manager/pojo/sink/SinkRequest.java | 16 +++++++++++++++-
 .../apache/inlong/manager/pojo/source/SourceRequest.java | 14 ++++++++++++++
 .../inlong/manager/pojo/stream/InlongStreamRequest.java  | 14 +++++++++++++-
 .../inlong/manager/pojo/transform/TransformRequest.java  |  8 ++++++++
 .../org/apache/inlong/manager/pojo/user/UserRequest.java | 10 ++++++++++
 13 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
index 30d3a1d4f..d701e4d6e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
@@ -17,11 +17,15 @@
 
 package org.apache.inlong.manager.pojo.cluster;
 
+import org.hibernate.validator.constraints.Length;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
 import java.util.List;
 
 /**
@@ -33,6 +37,8 @@ public class BindTagRequest {
 
     @NotBlank(message = "clusterTag cannot be blank")
     @ApiModelProperty(value = "Cluster tag")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String clusterTag;
 
     @ApiModelProperty(value = "Cluster-ID list which needs to bind tag")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 25821a393..5e789100b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -42,27 +43,33 @@ public class ClusterNodeRequest {
 
     @NotBlank(message = "type cannot be blank")
     @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.")
+    @Length(min = 1, max = 20, message = "length must be between 1 and 20")
     private String type;
 
     @NotBlank(message = "ip cannot be blank")
     @ApiModelProperty(value = "Cluster IP")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String ip;
 
     @NotNull(message = "port cannot be null")
     @ApiModelProperty(value = "Cluster port")
+    @Length(max = 6, message = "length must be less than or equal to 6")
     private Integer port;
 
     @NotBlank(message = "protocolType cannot be blank")
     @ApiModelProperty(value = "Cluster protocol type")
+    @Length(max = 20, message = "length must be less than or equal to 20")
     private String protocolType;
 
     @ApiModelProperty(value = "Current load value of the node")
     private Integer nodeLoad;
 
     @ApiModelProperty(value = "Extended params")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String extParams;
 
     @ApiModelProperty(value = "Description of the cluster node")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
 
     @ApiModelProperty(value = "Version number")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
index 90f07a315..ccb411af5 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java
@@ -24,9 +24,11 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
 
 /**
  * Inlong cluster request
@@ -44,35 +46,46 @@ public abstract class ClusterRequest {
 
     @NotBlank(message = "cluster name cannot be blank")
     @ApiModelProperty(value = "Cluster name")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String name;
 
     @NotBlank(message = "cluster type cannot be blank")
     @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, KAFKA, DATAPROXY, etc.")
+    @Length(min = 1, max = 20, message = "length must be between 1 and 20")
     private String type;
 
     @ApiModelProperty(value = "Cluster url")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String url;
 
     @NotBlank(message = "clusterTags cannot be blank")
     @ApiModelProperty(value = "Cluster tags, separated by commas")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String clusterTags;
 
     @ApiModelProperty(value = "Extension tag")
+    @Length(max = 128, message = "length must be less than or equal to 128")
     private String extTag = "default=true";
 
     @ApiModelProperty(value = "Cluster token")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String token;
 
     @ApiModelProperty(value = "Cluster heartbeat info")
+    @Length(max = 163840, message = "length must be less than or equal to 163840")
     private String heartbeat;
 
     @ApiModelProperty(value = "Extended params")
+    @Length(max = 163840, message = "length must be less than or equal to 163840")
     private String extParams;
 
     @ApiModelProperty(value = "Description of the cluster")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
 
     @ApiModelProperty(value = "Name of responsible person, separated by commas")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String inCharges;
 
     @ApiModelProperty(value = "Version number")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
index 289b89e2e..69d8c5540 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterTagRequest.java
@@ -21,9 +21,11 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
 
 /**
  * Inlong cluster tag request
@@ -38,15 +40,20 @@ public class ClusterTagRequest {
 
     @NotBlank(message = "clusterTag cannot be blank")
     @ApiModelProperty(value = "Cluster tag")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String clusterTag;
 
     @ApiModelProperty(value = "Extended params")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String extParams;
 
     @ApiModelProperty(value = "Description of the cluster tag")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
 
     @ApiModelProperty(value = "Name of in charges, separated by commas")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String inCharges;
 
     @ApiModelProperty(value = "Version number")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupExtInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupExtInfo.java
index e539e868f..f1baef4f0 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupExtInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupExtInfo.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.manager.pojo.group;
 
+import org.hibernate.validator.constraints.Length;
+
+import javax.validation.constraints.Pattern;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
@@ -38,12 +42,16 @@ public class InlongGroupExtInfo {
     private Integer id;
 
     @ApiModelProperty(value = "inlong group id", required = true)
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @ApiModelProperty(value = "property name")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String keyName;
 
     @ApiModelProperty(value = "property value")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String keyValue;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index dfa95d31a..9b68c572d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -50,9 +50,11 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
     private String inlongGroupId;
 
     @ApiModelProperty(value = "Inlong group name", required = true)
+    @Length(max = 128, message = "length must be less than or equal to 128")
     private String name;
 
     @ApiModelProperty(value = "Inlong group description")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
 
     @Deprecated
@@ -61,25 +63,30 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
 
     @NotBlank(message = "cannot be blank")
     @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+    @Length(min = 1, max = 20, message = "length must be between 1 and 20")
     private String mqType;
 
     @ApiModelProperty(value = "MQ resource", notes = "in inlong group, TubeMQ corresponds to Topic, Pulsar corresponds to Namespace")
+    @Length(max = 64, message = "length must be less than or equal to 64")
     private String mqResource;
 
     @ApiModelProperty(value = "TubeMQ master URL")
     private String tubeMaster;
 
     @ApiModelProperty(value = "Whether to enable zookeeper? 0: disable, 1: enable")
+    @Range(min = 0, max = 1, message = "default is 0, only supports [0: disable, 1: enable]")
     private Integer enableZookeeper = 0;
 
     @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
+    @Range(min = 0, max = 1, message = "default is 1, only supports [0: disable, 1: enable]")
     private Integer enableCreateResource = 1;
 
     @ApiModelProperty(value = "Whether to use lightweight mode, 0: no, 1: yes")
+    @Range(min = 0, max = 1, message = "default is 0, only supports [0: no, 1: yes]")
     private Integer lightweight = 0;
 
     @NotNull(message = "cannot be null")
-    @Range(min = 0, max = 2, message = "only supports 0, 1, 2")
+    @Range(min = 0, max = 2, message = "default is 0, only supports [0, 1, 2]")
     @ApiModelProperty(value = "Data report type, default is 0.\n"
             + " 0: report to DataProxy and respond when the DataProxy received data.\n"
             + " 1: report to DataProxy and respond after DataProxy sends data.\n"
@@ -87,6 +94,7 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
     private Integer dataReportType = 0;
 
     @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
+    @Length(max = 128, message = "Length must be less than or equal to 128")
     private String inlongClusterTag;
 
     @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
@@ -103,9 +111,11 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
 
     @NotBlank(message = "cannot be blank")
     @ApiModelProperty(value = "Name of responsible person, separated by commas")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String inCharges;
 
     @ApiModelProperty(value = "Name of followers, separated by commas")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String followers;
 
     @ApiModelProperty(value = "Inlong group Extension properties")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupResetRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupResetRequest.java
index c6cefb120..5a577d6a4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupResetRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupResetRequest.java
@@ -38,9 +38,8 @@ import javax.validation.constraints.Pattern;
 public class InlongGroupResetRequest {
 
     @ApiModelProperty(value = "Inlong group id", required = true)
-    @Length(min = 4, max = 200)
-    @Pattern(regexp = "^(?![0-9]+$)[a-z][a-z0-9_-]{1,200}$", message = "inlongGroupId must starts with a lowercase letter "
-            + "and contains only lowercase letters, digits, `-` or `_`")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @ApiModelProperty(value = "If rerun process when group is in operating, 0: false 1: true")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 4f520bf21..d4c3d89cb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -24,9 +24,11 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
 
 /**
  * Data node request
@@ -44,29 +46,38 @@ public abstract class DataNodeRequest {
 
     @NotBlank(message = "node name cannot be blank")
     @ApiModelProperty(value = "Data node name")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[A-Za-z0-9_-]{1,128}$", message = "only supports letters, numbers, '-', or '_'")
     private String name;
 
     @NotBlank(message = "node type cannot be blank")
     @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+    @Length(max = 20, message = "length must be less than or equal to 20")
     private String type;
 
     @ApiModelProperty(value = "Data node URL")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String url;
 
     @ApiModelProperty(value = "Data node username")
+    @Length(max = 128, message = "length must be less than or equal to 128")
     private String username;
 
     @ApiModelProperty(value = "Data node token if needed")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String token;
 
     @ApiModelProperty(value = "Extended params")
+    @Length(min = 1, max = 16384, message = "length must be between 1 and 16384")
     private String extParams;
 
     @ApiModelProperty(value = "Description of the data node")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
 
     @NotBlank(message = "inCharges cannot be blank")
     @ApiModelProperty(value = "Name of responsible person, separated by commas", required = true)
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String inCharges;
 
     @ApiModelProperty(value = "Version number")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index a6089adcd..d8ca5f02d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -24,6 +24,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.hibernate.validator.constraints.Length;
+import org.hibernate.validator.constraints.Range;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -45,38 +46,51 @@ public abstract class SinkRequest {
 
     @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
     @NotBlank(message = "sinkType cannot be blank")
     @ApiModelProperty("Sink type, including: HIVE, ES, etc.")
+    @Length(max = 15, message = "length must be less than or equal to 15")
     private String sinkType;
 
     @NotBlank(message = "sinkName cannot be blank")
+    @ApiModelProperty("Sink name, unique in one stream")
     @Length(min = 1, max = 100, message = "sinkName length must be between 1 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "sinkName only supports lowercase letters, numbers, '-', or '_'")
-    @ApiModelProperty("Sink name, unique in one stream")
     private String sinkName;
 
     @ApiModelProperty("Sink description")
+    @Length(max = 500, message = "length must be less than or equal to 500")
     private String description;
 
     @ApiModelProperty("Inlong cluster name")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongClusterName;
 
     @ApiModelProperty("Data node name")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[A-Za-z0-9_-]{1,128}$", message = "only supports letters, numbers, '-', or '_'")
     private String dataNodeName;
 
     @ApiModelProperty("Sort task name")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String sortTaskName;
 
     @ApiModelProperty("Sort consumer group")
+    @Length(max = 512, message = "length must be less than or equal to 512")
     private String sortConsumerGroup;
 
     @ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. Default is 1")
+    @Range(min = 0, max = 1, message = "default is 1, only supports [0: disable, 1: enable]")
     private Integer enableCreateResource = 1;
 
     @ApiModelProperty(value = "Whether to start the process after saving or updating. Default is false")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 7e3db4e63..099ab45e7 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -47,14 +47,19 @@ public class SourceRequest {
 
     @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
     @NotBlank(message = "sourceType cannot be blank")
     @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
+    @Length(min = 1, max = 20, message = "length must be between 1 and 20")
     private String sourceType;
 
     @NotBlank(message = "sourceName cannot be blank")
@@ -64,24 +69,33 @@ public class SourceRequest {
     private String sourceName;
 
     @ApiModelProperty("Ip of the agent running the task")
+    @Length(max = 40, message = "length must be less than or equal to 40")
     private String agentIp;
 
     @ApiModelProperty("Mac uuid of the agent running the task")
+    @Length(max = 30, message = "length must be less than or equal to 30")
     private String uuid;
 
     @ApiModelProperty("Inlong cluster name")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongClusterName;
 
     @ApiModelProperty("Inlong cluster node tag")
+    @Length(min = 1, max = 128, message = "length must be between 1 and 128")
+    @Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongClusterNodeTag;
 
     @ApiModelProperty("Data node name")
+    @Length(max = 128, message = "length must be less than or equal to 128")
     private String dataNodeName;
 
     @ApiModelProperty("Serialization type, support: csv, json, canal, avro, etc")
+    @Length(max = 20, message = "length must be less than or equal to 20")
     private String serializationType;
 
     @ApiModelProperty("Snapshot of the source task")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String snapshot;
 
     @ApiModelProperty("Version")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
index b8061daff..277e035f8 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamRequest.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.hibernate.validator.constraints.Length;
+import org.hibernate.validator.constraints.Range;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.Pattern;
@@ -41,37 +42,47 @@ public class InlongStreamRequest extends BaseInlongStream {
 
     @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty(value = "Inlong group id")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @NotBlank(message = "inlongStreamId cannot be blank")
+    @ApiModelProperty(value = "Inlong stream id")
     @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
-    @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
 
     @ApiModelProperty(value = "Inlong stream name", required = true)
+    @Length(max = 64, message = "length must be less than or equal to 64")
     private String name;
 
     @ApiModelProperty(value = "Inlong stream description")
+    @Length(max = 256, message = "length must be less than or equal to 256")
     private String description;
 
     @ApiModelProperty(value = "MQ resource")
+    @Length(max = 64, message = "length must be less than or equal to 64")
     private String mqResource;
 
     @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
+    @Length(max = 20, message = "length must be less than or equal to 20")
     private String dataType;
 
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+    @Length(max = 8, message = "length must be less than or equal to 8")
     private String dataEncoding = StandardCharsets.UTF_8.toString();
 
     @ApiModelProperty(value = "Data separator")
+    @Length(max = 8, message = "length must be less than or equal to 8")
     private String dataSeparator = String.valueOf((int) '|');
 
     @ApiModelProperty(value = "Data field escape symbol")
+    @Length(max = 8, message = "length must be less than or equal to 8")
     private String dataEscapeChar;
 
     @ApiModelProperty(value = "Whether to send synchronously, 0: no, 1: yes", notes = "Each task under this stream sends data synchronously, "
             + "which will affect the throughput of data collection, please choose carefully")
+    @Range(min = 0, max = 1, message = "default is 0, only supports [0: no, 1: yes]")
     private Integer syncSend = 0;
 
     @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
@@ -90,6 +101,7 @@ public class InlongStreamRequest extends BaseInlongStream {
     private Integer storagePeriod;
 
     @ApiModelProperty(value = "Extended params, will be saved as JSON string")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String extParams;
 
     @ApiModelProperty(value = "Field list")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
index 7a8a24e53..5895d5d44 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
@@ -42,10 +42,14 @@ public class TransformRequest {
 
     @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
     @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
     @NotBlank(message = "transformName cannot be blank")
@@ -56,18 +60,22 @@ public class TransformRequest {
 
     @NotBlank(message = "transformType cannot be blank")
     @ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+    @Length(max = 15, message = "length must be less than or equal to 15")
     private String transformType;
 
     @NotBlank(message = "preNodeNames cannot be blank")
     @ApiModelProperty("Pre node names of transform in this stream, join by ','")
+    @Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
     private String preNodeNames;
 
     @NotBlank(message = "postNodeNames cannot be blank")
     @ApiModelProperty("Post node names of transform in this stream, join by ','")
+    @Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
     private String postNodeNames;
 
     @NotBlank(message = "transformDefinition cannot be blank")
     @ApiModelProperty("Transform definition in json type")
+    @Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
     private String transformDefinition;
 
     @ApiModelProperty("Version of transform")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/user/UserRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/user/UserRequest.java
index 15bbae2d0..b84018bec 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/user/UserRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/user/UserRequest.java
@@ -27,6 +27,8 @@ import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.pojo.common.PageRequest;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.validation.InEnumInt;
+import org.hibernate.validator.constraints.Length;
+import org.hibernate.validator.constraints.Range;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotBlank;
@@ -48,24 +50,30 @@ public class UserRequest extends PageRequest {
 
     @NotBlank(message = "User name cannot be blank")
     @ApiModelProperty(value = "User name", required = true)
+    @Length(min = 1, max = 256, message = "length must be between 1 and 256")
     private String name;
 
     @ApiModelProperty(value = "Keyword, can be user name")
     private String keyword;
 
     @ApiModelProperty(value = "User password")
+    @Length(min = 1, max = 64, message = "length must be between 1 and 64")
     private String password;
 
     @ApiModelProperty(value = "New password, is required if needs updated")
+    @Length(min = 1, max = 64, message = "length must be between 1 and 64")
     private String newPassword;
 
     @ApiModelProperty("Secret key")
+    @Length(min = 1, max = 256, message = "length must be between 1 and 256")
     private String secretKey;
 
     @ApiModelProperty("Public key")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String publicKey;
 
     @ApiModelProperty("Private key")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String privateKey;
 
     @ApiModelProperty("Encryption key version")
@@ -74,6 +82,7 @@ public class UserRequest extends PageRequest {
     @NotNull(message = "accountType cannot be null")
     @InEnumInt(UserTypeEnum.class)
     @ApiModelProperty(value = "Account type: 0 - manager, 1 - operator", required = true)
+    @Range(min = 0, max = 1, message = "only supports [0: manager, 1: operator]")
     private Integer accountType;
 
     @Min(1)
@@ -85,6 +94,7 @@ public class UserRequest extends PageRequest {
     private Integer version;
 
     @ApiModelProperty(value = "Extension json info")
+    @Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
     private String extParams;
 
 }


[inlong] 03/07: [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d2fe1614c4ba7ca14ddd4a2edebd60c940802c95
Author: feat <fe...@outlook.com>
AuthorDate: Thu Jan 5 10:35:58 2023 +0800

    [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../protocol/node/extract/KafkaExtractNode.java    | 53 +++++++++++++++++++---
 .../node/extract/KafkaExtractNodeTest.java         | 22 +++++++++
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 718c3c21c..6a0501759 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map.Entry;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
     }
 
     /**
-     * generate table options
+     * Generate table options for Kafka extract node.
+     * <p/>
+     * Upsert Kafka stores message keys and values as bytes, so no need specified the schema or data types for Kafka.
+     * <br/>
+     * The messages of Kafka are serialized and deserialized by formats, e.g. csv, json, avro.
+     * <br/>
+     * Thus, the data type mapping is determined by specific formats.
+     * <p/>
+     * For more details:
+     * <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/">
+     * upsert-kafka</a>
      *
      * @return options
      */
@@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
         Map<String, String> options = super.tableOptions();
         options.put(KafkaConstant.TOPIC, topic);
         options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers);
-        if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
+
+        boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
+        Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) format).getInnerFormat() : format;
+        if (realFormat instanceof JsonFormat
+                || realFormat instanceof AvroFormat
+                || realFormat instanceof CsvFormat) {
             if (StringUtils.isEmpty(this.primaryKey)) {
                 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
                 options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
@@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
                 if (StringUtils.isNotBlank(scanTimestampMillis)) {
                     options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
                 }
-                options.putAll(format.generateOptions(false));
+                options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg));
             } else {
                 options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA);
-                options.putAll(format.generateOptions(true));
+                options.putAll(delegateInlongFormat(realFormat.generateOptions(true), wrapWithInlongMsg));
             }
-        } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat
-                || format instanceof RawFormat) {
+        } else if (realFormat instanceof CanalJsonFormat
+                || realFormat instanceof DebeziumJsonFormat
+                || realFormat instanceof RawFormat) {
             options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
             options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue());
             if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
@@ -167,7 +186,7 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
             if (StringUtils.isNotBlank(scanTimestampMillis)) {
                 options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis);
             }
-            options.putAll(format.generateOptions(false));
+            options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg));
         } else {
             throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
         }
@@ -177,6 +196,26 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
         return options;
     }
 
+    private Map<String, String> delegateInlongFormat(
+            Map<String, String> realOptions,
+            boolean wrapWithInlongMsg) {
+        if (!wrapWithInlongMsg) {
+            return realOptions;
+        }
+        Map<String, String> options = new HashMap<>();
+        for (Entry<String, String> entry : realOptions.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if ("format".equals(key)) {
+                options.put("format", "inlong-msg");
+                options.put("inlong-msg.inner.format", value);
+            } else {
+                options.put("inlong-msg." + key, value);
+            }
+        }
+        return options;
+    }
+
     @Override
     public String genTableName() {
         return String.format("table_%s", super.getId());
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index 906db8231..cda33fae9 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.junit.Assert;
 import org.junit.Test;
@@ -112,4 +113,25 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<KafkaExtractNode> {
         }
         Assert.assertTrue(formatEquals);
     }
+
+    @Test
+    public void testInLongFormat() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+
+        KafkaExtractNode kafkaNode = getTestObject();
+        InLongMsgFormat inLongMsgFormat = new InLongMsgFormat(new CsvFormat(), false);
+        kafkaNode.setFormat(inLongMsgFormat);
+
+        Map<String, String> options = kafkaNode.tableOptions();
+        assertEquals("inlong-msg", options.get("format"));
+        assertEquals("csv", options.get("inlong-msg.inner.format"));
+        assertEquals("true", options.get("inlong-msg.csv.ignore-parse-errors"));
+
+        kafkaNode.setFormat(new CsvFormat());
+        Map<String, String> csvOptions = kafkaNode.tableOptions();
+        assertEquals("csv", csvOptions.get("format"));
+        assertEquals("true", csvOptions.get("csv.ignore-parse-errors"));
+    }
 }


[inlong] 07/07: [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 3db90d470d53118d8c8b9f7acd3c479b54f63a27
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Jan 5 14:58:31 2023 +0800

    [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)
---
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++++++++++----------
 2 files changed, 71 insertions(+), 69 deletions(-)

diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index b22d7dbf4..fb600cfcc 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable {
     private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
-        if (topic != null || config.isTopicStaticsEnabled()) {
+        if (topic != null && config.isTopicStaticsEnabled()) {
             dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId());
             dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic());
         }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 4c9b41a9b..f8d39c361 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
                 consumer.acknowledgeAsync(messageId)
                         .thenAccept(consumer -> ackSucc(msgOffset))
                         .exceptionally(exception -> {
-                            LOGGER.error("ack fail:{} {},error:{}",
-                                    topic, msgOffset, exception.getMessage(), exception);
+                            LOGGER.error("ack fail:{} {}",
+                                    topic, msgOffset, exception);
                             context.addAckFail(topic, -1);
                             return null;
                         });
@@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
             String threadName = String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d",
                     this.topic.getInLongCluster().getClusterId(), topic.getTopic(), this.hashCode());
             this.fetchThread = new Thread(new PulsarSingleTopicFetcher.Fetcher(), threadName);
+            this.fetchThread.setDaemon(true);
             this.fetchThread.start();
         } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
+            LOGGER.error("fail to create consumer", e);
             return false;
         }
         return true;
@@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
                 if (consumer != null) {
                     consumer.close();
                 }
-                if (fetchThread != null) {
-                    fetchThread.interrupt();
-                }
             } catch (PulsarClientException e) {
                 LOGGER.warn(e.getMessage(), e);
             }
@@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
             } catch (Exception e) {
                 context.addCallBackFail(topic, -1, messageRecords.size(),
                         System.currentTimeMillis() - start);
-                LOGGER.error("failed to callback {}", e.getMessage(), e);
+                LOGGER.error("failed to callback", e);
             }
         }
 
@@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher {
         public void run() {
             boolean hasPermit;
             while (true) {
-                hasPermit = false;
-                long fetchTimeCost = -1;
                 try {
-                    if (context.getConfig().isStopConsume() || stopConsume) {
-                        TimeUnit.MILLISECONDS.sleep(50);
-                        continue;
-                    }
+                    hasPermit = false;
+                    long fetchTimeCost = -1;
+                    try {
+                        if (context.getConfig().isStopConsume() || stopConsume) {
+                            TimeUnit.MILLISECONDS.sleep(50);
+                            continue;
+                        }
 
-                    if (sleepTime > 0) {
-                        TimeUnit.MILLISECONDS.sleep(sleepTime);
-                    }
+                        if (sleepTime > 0) {
+                            TimeUnit.MILLISECONDS.sleep(sleepTime);
+                        }
 
-                    context.acquireRequestPermit();
-                    hasPermit = true;
-                    context.addConsumeTime(topic, -1);
+                        context.acquireRequestPermit();
+                        hasPermit = true;
+                        context.addConsumeTime(topic, -1);
 
-                    long startFetchTime = System.currentTimeMillis();
-                    Messages<byte[]> messages = consumer.batchReceive();
-                    fetchTimeCost = System.currentTimeMillis() - startFetchTime;
-                    if (null != messages && messages.size() != 0) {
-                        for (Message<byte[]> msg : messages) {
-                            // if need seek
-                            if (msg.getPublishTime() < seeker.getSeekTime()) {
-                                seeker.seek();
-                                break;
-                            }
+                        long startFetchTime = System.currentTimeMillis();
+                        Messages<byte[]> messages = consumer.batchReceive();
+                        fetchTimeCost = System.currentTimeMillis() - startFetchTime;
+                        if (null != messages && messages.size() != 0) {
+                            for (Message<byte[]> msg : messages) {
+                                // if need seek
+                                if (msg.getPublishTime() < seeker.getSeekTime()) {
+                                    seeker.seek();
+                                    break;
+                                }
 
-                            String offsetKey = getOffset(msg.getMessageId());
-                            offsetCache.put(offsetKey, msg.getMessageId());
+                                String offsetKey = getOffset(msg.getMessageId());
+                                offsetCache.put(offsetKey, msg.getMessageId());
 
-                            // deserialize
-                            List<InLongMessage> inLongMessages = deserializer
-                                    .deserialize(context, topic, msg.getProperties(), msg.getData());
-                            context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length,
-                                    fetchTimeCost);
-                            int originSize = inLongMessages.size();
-                            // intercept
-                            inLongMessages = interceptor.intercept(inLongMessages);
-                            if (inLongMessages.isEmpty()) {
-                                ack(offsetKey);
-                                continue;
-                            }
-                            int filterSize = originSize - inLongMessages.size();
-                            context.addConsumeFilter(topic, -1, filterSize);
+                                // deserialize
+                                List<InLongMessage> inLongMessages = deserializer
+                                        .deserialize(context, topic, msg.getProperties(), msg.getData());
+                                context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length,
+                                        fetchTimeCost);
+                                int originSize = inLongMessages.size();
+                                // intercept
+                                inLongMessages = interceptor.intercept(inLongMessages);
+                                if (inLongMessages.isEmpty()) {
+                                    ack(offsetKey);
+                                    continue;
+                                }
+                                int filterSize = originSize - inLongMessages.size();
+                                context.addConsumeFilter(topic, -1, filterSize);
 
-                            List<MessageRecord> msgs = new ArrayList<>();
-                            msgs.add(new MessageRecord(topic.getTopicKey(),
-                                    inLongMessages,
-                                    offsetKey, System.currentTimeMillis()));
-                            handleAndCallbackMsg(msgs);
+                                List<MessageRecord> msgs = new ArrayList<>();
+                                msgs.add(new MessageRecord(topic.getTopicKey(),
+                                        inLongMessages,
+                                        offsetKey, System.currentTimeMillis()));
+                                handleAndCallbackMsg(msgs);
+                            }
+                            sleepTime = 0L;
+                        } else {
+                            context.addConsumeEmpty(topic, -1, fetchTimeCost);
+                            emptyFetchTimes++;
+                            if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
+                                sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
+                                        context.getConfig().getMaxEmptyPollSleepMs());
+                                emptyFetchTimes = 0;
+                            }
                         }
-                        sleepTime = 0L;
-                    } else {
-                        context.addConsumeEmpty(topic, -1, fetchTimeCost);
-                        emptyFetchTimes++;
-                        if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
-                            sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
-                                    context.getConfig().getMaxEmptyPollSleepMs());
-                            emptyFetchTimes = 0;
+                    } catch (Exception e) {
+                        context.addConsumeError(topic, -1, fetchTimeCost);
+                        LOGGER.error("failed to fetch msg", e);
+                    } finally {
+                        if (hasPermit) {
+                            context.releaseRequestPermit();
                         }
                     }
-                } catch (Exception e) {
-                    context.addConsumeError(topic, -1, fetchTimeCost);
-                    LOGGER.error("failed to fetch msg: {}", e.getMessage(), e);
-                } finally {
-                    if (hasPermit) {
-                        context.releaseRequestPermit();
-                    }
-                }
 
-                if (closed) {
-                    break;
+                    if (closed) {
+                        break;
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("got exception while process fetching", t);
                 }
             }
         }