You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/27 06:07:36 UTC

[inlong] branch master updated: [INLONG-4780][Manager] Combine some constant classes into one (#4783)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e42cb6209 [INLONG-4780][Manager] Combine some constant classes into one (#4783)
e42cb6209 is described below

commit e42cb6209b860004a508c07fa0999185eed697d0
Author: healzhou <he...@gmail.com>
AuthorDate: Mon Jun 27 14:07:31 2022 +0800

    [INLONG-4780][Manager] Combine some constant classes into one (#4783)
---
 .../apache/inlong/manager/client/BaseExample.java  |  4 +-
 .../apache/inlong/manager/client/ut/BaseTest.java  |  4 +-
 .../client/api/util/InlongGroupTransfer.java       | 43 +++++++++---------
 .../InlongConstants.java}                          | 48 ++++++++------------
 .../manager/common/enums/GlobalConstants.java      | 39 -----------------
 .../plugin/listener/DeleteSortListener.java        |  8 ++--
 .../plugin/listener/DeleteStreamListener.java      |  8 ++--
 .../plugin/listener/RestartSortListener.java       | 10 ++---
 .../plugin/listener/RestartStreamListener.java     | 10 ++---
 .../plugin/listener/StartupSortListener.java       | 10 ++---
 .../plugin/listener/StartupStreamListener.java     | 10 ++---
 .../plugin/listener/SuspendSortListener.java       |  8 ++--
 .../plugin/listener/SuspendStreamListener.java     |  8 ++--
 .../plugin/listener/DeleteSortListenerTest.java    |  8 ++--
 .../plugin/listener/RestartSortListenerTest.java   | 10 ++---
 .../plugin/listener/StartupSortListenerTest.java   |  8 ++--
 .../plugin/listener/SuspendSortListenerTest.java   |  8 ++--
 .../service/cluster/AbstractClusterOperator.java   |  4 +-
 .../service/cluster/InlongClusterServiceImpl.java  | 13 +++---
 .../service/core/impl/ConsumptionServiceImpl.java  | 11 +++--
 .../service/core/impl/DataNodeServiceImpl.java     |  6 +--
 .../service/core/impl/InlongStreamServiceImpl.java |  6 +--
 .../core/impl/WorkflowApproverServiceImpl.java     |  4 +-
 .../service/group/AbstractGroupOperator.java       |  4 +-
 .../service/group/InlongPulsarOperator.java        |  2 +-
 .../service/mq/CreatePulsarGroupTaskListener.java  |  4 +-
 .../mq/CreatePulsarResourceTaskListener.java       |  4 +-
 .../service/mq/CreatePulsarTopicTaskListener.java  |  4 +-
 .../manager/service/mq/PulsarEventSelector.java    |  4 +-
 .../service/resource/SinkResourceListener.java     |  4 +-
 .../resource/StreamSinkResourceListener.java       |  4 +-
 .../resource/ck/ClickHouseResourceOperator.java    |  4 +-
 .../resource/es/ElasticsearchResourceOperator.java |  4 +-
 .../resource/hbase/HBaseResourceOperator.java      |  4 +-
 .../resource/hive/HiveResourceOperator.java        |  4 +-
 .../resource/iceberg/IcebergResourceOperator.java  |  4 +-
 .../postgres/PostgresResourceOperator.java         |  4 +-
 .../manager/service/sink/AbstractSinkOperator.java |  6 +--
 .../service/sink/StreamSinkServiceImpl.java        |  6 +--
 .../service/sink/iceberg/IcebergSinkOperator.java  |  4 +-
 .../service/sort/CreateSortConfigListener.java     |  6 +--
 .../service/sort/CreateSortConfigListenerV2.java   |  8 ++--
 .../sort/CreateStreamSortConfigListener.java       |  4 +-
 .../service/sort/light/LightGroupSortListener.java |  6 +--
 .../manager/service/sort/util/DataFlowUtils.java   |  4 +-
 .../service/source/AbstractSourceOperator.java     |  6 +--
 .../transform/StreamTransformServiceImpl.java      |  6 +--
 .../ConsumptionCompleteProcessListener.java        |  4 +-
 .../service/cluster/InlongClusterServiceTest.java  |  3 +-
 .../core/sink/ClickHouseStreamSinkServiceTest.java |  6 +--
 .../core/sink/DLCIcebergStreamSinkServiceTest.java |  6 +--
 .../sink/ElasticsearchStreamSinkServiceTest.java   |  6 +--
 .../core/sink/GreenplumStreamSinkServiceTest.java  |  6 +--
 .../core/sink/HBaseStreamSinkServiceTest.java      |  6 +--
 .../core/sink/HdfsStreamSinkServiceTest.java       |  6 +--
 .../core/sink/HiveStreamSinkServiceTest.java       |  6 +--
 .../core/sink/IcebergStreamSinkServiceTest.java    |  6 +--
 .../core/sink/KafkaStreamSinkServiceTest.java      |  6 +--
 .../core/sink/MysqlStreamSinkServiceTest.java      |  6 +--
 .../core/sink/OracleStreamSinkServiceTest.java     |  6 +--
 .../core/sink/PostgresStreamSinkServiceTest.java   |  6 +--
 .../core/sink/SqlServerStreamSinkServiceTest.java  |  6 +--
 .../sink/TDSQLPostgreSQLStreamSinkServiceTest.java |  6 +--
 .../manager/service/mq/util/PulsarUtilsTest.java   | 51 ++--------------------
 64 files changed, 222 insertions(+), 318 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
index b17f78a70..268001f1d 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -111,7 +111,7 @@ public class BaseExample {
         streamInfo.setDataEncoding(StandardCharsets.UTF_8.toString());
         streamInfo.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
         // if you need strictly order for data, set to 1
-        streamInfo.setSyncSend(GlobalConstants.SYNC_SEND);
+        streamInfo.setSyncSend(InlongConstants.SYNC_SEND);
         streamInfo.setMqResource(this.getTopic());
         return streamInfo;
     }
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 6f6899f40..a5f3182f8 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.enums.DataSeparator;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -142,7 +142,7 @@ public class BaseTest {
         streamInfo.setDataEncoding(StandardCharsets.UTF_8.toString());
         streamInfo.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
         // if you need strictly order for data, set to 1
-        streamInfo.setSyncSend(GlobalConstants.SYNC_SEND);
+        streamInfo.setSyncSend(InlongConstants.SYNC_SEND);
         streamInfo.setMqResource(TOPIC);
         return streamInfo;
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 26a604e1f..0a247d259 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -25,13 +25,13 @@ import org.apache.inlong.manager.common.auth.Authentication;
 import org.apache.inlong.manager.common.auth.Authentication.AuthType;
 import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.common.auth.TokenAuthentication;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sort.BaseSortConf;
 import org.apache.inlong.manager.common.pojo.sort.BaseSortConf.SortType;
 import org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
 import org.apache.inlong.manager.common.pojo.sort.UserDefinedSortConf;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.AssertUtils;
 
 import java.util.ArrayList;
@@ -47,34 +47,33 @@ public class InlongGroupTransfer {
     /**
      * Create inlong group info from group config.
      */
-    public static InlongGroupInfo createGroupInfo(InlongGroupInfo originGroupInfo, BaseSortConf sortConf) {
-        AssertUtils.notNull(originGroupInfo, "Inlong group info cannot be null");
-        AssertUtils.hasLength(originGroupInfo.getInlongGroupId(), "groupId cannot be empty");
-        originGroupInfo.setExtList(Lists.newArrayList());
+    public static InlongGroupInfo createGroupInfo(InlongGroupInfo groupInfo, BaseSortConf sortConf) {
+        AssertUtils.notNull(groupInfo, "Inlong group info cannot be null");
+        AssertUtils.hasLength(groupInfo.getInlongGroupId(), "groupId cannot be empty");
+        groupInfo.setExtList(Lists.newArrayList());
         // set authentication into group ext list
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();
-        if (originGroupInfo.getAuthentication() != null) {
-            Authentication authentication = originGroupInfo.getAuthentication();
+        if (groupInfo.getAuthentication() != null) {
+            Authentication authentication = groupInfo.getAuthentication();
             AuthType authType = authentication.getAuthType();
             AssertUtils.isTrue(authType == AuthType.TOKEN,
                     String.format("Unsupported authentication:%s for pulsar", authType.name()));
             TokenAuthentication tokenAuthentication = (TokenAuthentication) authentication;
             InlongGroupExtInfo authTypeExt = new InlongGroupExtInfo();
-            authTypeExt.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION_TYPE);
+            authTypeExt.setKeyName(InlongConstants.PULSAR_AUTHENTICATION_TYPE);
             authTypeExt.setKeyValue(tokenAuthentication.getAuthType().toString());
             extInfos.add(authTypeExt);
 
             InlongGroupExtInfo authValue = new InlongGroupExtInfo();
-            authValue.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION);
+            authValue.setKeyName(InlongConstants.PULSAR_AUTHENTICATION);
             authValue.setKeyValue(tokenAuthentication.getToken());
             extInfos.add(authValue);
 
-            originGroupInfo.getExtList().addAll(extInfos);
+            groupInfo.getExtList().addAll(extInfos);
         }
 
         if (sortConf == null) {
-            throw new IllegalArgumentException(
-                    String.format("sort config cannot be empty for group=", originGroupInfo.getInlongGroupId()));
+            throw new IllegalArgumentException("sort config cannot be empty for group=" + groupInfo.getInlongGroupId());
         }
         // set the sort config into ext list
         SortType sortType = sortConf.getType();
@@ -90,8 +89,8 @@ public class InlongGroupTransfer {
             sortExtInfos = new ArrayList<>();
         }
 
-        originGroupInfo.getExtList().addAll(sortExtInfos);
-        return originGroupInfo;
+        groupInfo.getExtList().addAll(sortExtInfos);
+        return groupInfo;
     }
 
     /**
@@ -100,7 +99,7 @@ public class InlongGroupTransfer {
     public static List<InlongGroupExtInfo> createFlinkExtInfo(FlinkSortConf flinkSortConf) {
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();
         InlongGroupExtInfo sortType = new InlongGroupExtInfo();
-        sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+        sortType.setKeyName(InlongConstants.SORT_TYPE);
         sortType.setKeyValue(SortType.FLINK.getType());
         extInfos.add(sortType);
         if (flinkSortConf.getAuthentication() != null) {
@@ -110,23 +109,23 @@ public class InlongGroupTransfer {
                     String.format("Unsupported authentication:%s for flink", authType.name()));
             final SecretTokenAuthentication secretTokenAuthentication = (SecretTokenAuthentication) authentication;
             InlongGroupExtInfo authTypeExt = new InlongGroupExtInfo();
-            authTypeExt.setKeyName(InlongGroupSettings.SORT_AUTHENTICATION_TYPE);
+            authTypeExt.setKeyName(InlongConstants.SORT_AUTHENTICATION_TYPE);
             authTypeExt.setKeyValue(authType.toString());
             extInfos.add(authTypeExt);
             InlongGroupExtInfo authValue = new InlongGroupExtInfo();
-            authValue.setKeyName(InlongGroupSettings.SORT_AUTHENTICATION);
+            authValue.setKeyName(InlongConstants.SORT_AUTHENTICATION);
             authValue.setKeyValue(secretTokenAuthentication.toString());
             extInfos.add(authValue);
         }
         if (StringUtils.isNotEmpty(flinkSortConf.getServiceUrl())) {
             InlongGroupExtInfo flinkUrl = new InlongGroupExtInfo();
-            flinkUrl.setKeyName(InlongGroupSettings.SORT_URL);
+            flinkUrl.setKeyName(InlongConstants.SORT_URL);
             flinkUrl.setKeyValue(flinkSortConf.getServiceUrl());
             extInfos.add(flinkUrl);
         }
         if (MapUtils.isNotEmpty(flinkSortConf.getProperties())) {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
-            flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+            flinkProperties.setKeyName(InlongConstants.SORT_PROPERTIES);
             try {
                 flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(flinkSortConf.getProperties()));
             } catch (Exception e) {
@@ -143,16 +142,16 @@ public class InlongGroupTransfer {
     public static List<InlongGroupExtInfo> createUserDefinedSortExtInfo(UserDefinedSortConf userDefinedSortConf) {
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();
         InlongGroupExtInfo sortType = new InlongGroupExtInfo();
-        sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+        sortType.setKeyName(InlongConstants.SORT_TYPE);
         sortType.setKeyValue(SortType.USER_DEFINED.getType());
         extInfos.add(sortType);
         InlongGroupExtInfo sortName = new InlongGroupExtInfo();
-        sortName.setKeyName(InlongGroupSettings.SORT_NAME);
+        sortName.setKeyName(InlongConstants.SORT_NAME);
         sortName.setKeyValue(userDefinedSortConf.getSortName());
         extInfos.add(sortName);
         if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
-            flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+            flinkProperties.setKeyName(InlongConstants.SORT_PROPERTIES);
             try {
                 flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(userDefinedSortConf.getProperties()));
             } catch (Exception e) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
similarity index 68%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index d8fb440e6..f1c336d49 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -15,29 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.settings;
+package org.apache.inlong.manager.common.consts;
 
-public class InlongGroupSettings {
+/**
+ * Global constant for the Inlong system.
+ */
+public class InlongConstants {
 
-    /**
-     * config of group
-     */
-    public static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
+    public static final Integer UN_DELETED = 0;
 
-    public static final String DATA_FLOW = "dataFlow";
+    public static final Integer IS_DELETED = 1;
 
-    /**
-     * Refer to{@link org.apache.inlong.manager.common.enums.GroupMode#getMode}
-     */
-    public static final String GROUP_MODE = "group.mode";
+    public static final Integer DELETED_STATUS = 10;
 
-    /**
-     * config of pulsar
-     */
-    public static final String PULSAR_ADMIN_URL = "pulsar_adminUrl";
+    public static final Integer DISABLE_CREATE_RESOURCE = 0;
+
+    public static final Integer ENABLE_CREATE_RESOURCE = 1;
+
+    public static final Integer SYNC_SEND = 1;
 
-    public static final String PULSAR_SERVICE_URL = "pulsar_serviceUrl";
+    public static final Integer UN_SYNC_SEND = 0;
 
+    /**
+     * Pulsar config
+     */
     public static final String PULSAR_AUTHENTICATION = "pulsar.authentication";
 
     public static final String PULSAR_AUTHENTICATION_TYPE = "pulsar.authentication.type";
@@ -52,22 +53,11 @@ public class InlongGroupSettings {
     public static final String PULSAR_TOPIC_FORMAT = "persistent://%s/%s/%s";
 
     /**
-     * config of tube mq
+     * Sort config
      */
-    public static final String TUBE_MANAGER_URL = "tube.manager.url";
-
-    public static final String TUBE_MASTER_URL = "tube.master.url";
 
-    public static final String TUBE_CLUSTER_ID = "tube.cluster.id";
-
-    /**
-     * config of dataproxy
-     */
-    public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
+    public static final String DATA_FLOW = "dataFlow";
 
-    /**
-     * config of sort
-     */
     public static final String SORT_JOB_ID = "sort.job.id";
 
     public static final String SORT_TYPE = "sort.type";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GlobalConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GlobalConstants.java
deleted file mode 100644
index 40be383bd..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GlobalConstants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.enums;
-
-/**
- * Global constant for system
- */
-public class GlobalConstants {
-
-    public static final Integer UN_DELETED = 0;
-
-    public static final Integer IS_DELETED = 1;
-
-    public static final Integer DELETED_STATUS = 10;
-
-    public static final Integer DISABLE_CREATE_RESOURCE = 0;
-
-    public static final Integer ENABLE_CREATE_RESOURCE = 1;
-
-    public static final Integer SYNC_SEND = 1;
-
-    public static final Integer UN_SYNC_SEND = 0;
-
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index ccdb162ef..8c3a0370c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -70,7 +70,7 @@ public class DeleteSortListener implements SortOperateListener {
 
         Map<String, String> kvConf = extList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
                     groupId);
@@ -82,7 +82,7 @@ public class DeleteSortListener implements SortOperateListener {
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s]", groupId);
             return ListenerResult.fail(message);
@@ -90,7 +90,7 @@ public class DeleteSortListener implements SortOperateListener {
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 4b2b2ff65..926146a79 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -72,7 +72,7 @@ public class DeleteStreamListener implements SortOperateListener {
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format(
                     "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
@@ -85,7 +85,7 @@ public class DeleteStreamListener implements SortOperateListener {
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
             return ListenerResult.fail(message);
@@ -93,7 +93,7 @@ public class DeleteStreamListener implements SortOperateListener {
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 507972587..cd552d9d8 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -21,11 +21,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -71,7 +71,7 @@ public class RestartSortListener implements SortOperateListener {
 
         Map<String, String> kvConf = extList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
                     groupId);
@@ -83,12 +83,12 @@ public class RestartSortListener implements SortOperateListener {
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s]", groupId);
             return ListenerResult.fail(message);
         }
-        String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+        String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
             String message = String.format("dataflow is empty for groupId [%s]", groupId);
             log.error(message);
@@ -99,7 +99,7 @@ public class RestartSortListener implements SortOperateListener {
         flinkInfo.setJobId(jobId);
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 3974da351..08a554119 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -21,13 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -73,7 +73,7 @@ public class RestartStreamListener implements SortOperateListener {
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format(
                     "restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
@@ -86,12 +86,12 @@ public class RestartStreamListener implements SortOperateListener {
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
             return ListenerResult.fail(message);
         }
-        String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+        String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
             String message = String.format("dataflow is empty for groupId [%s] streamId [%s]", groupId, streamId);
             log.error(message);
@@ -102,7 +102,7 @@ public class RestartStreamListener implements SortOperateListener {
         flinkInfo.setJobId(jobId);
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 3d99974ab..76d81587d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -21,11 +21,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -73,7 +73,7 @@ public class StartupSortListener implements SortOperateListener {
                 && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
                 InlongGroupExtInfo::getKeyName,
                 InlongGroupExtInfo::getKeyValue));
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isNotEmpty(sortExt)) {
             Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
                     new TypeReference<Map<String, String>>() {
@@ -81,7 +81,7 @@ public class StartupSortListener implements SortOperateListener {
             kvConf.putAll(result);
         }
 
-        String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+        String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
             String message = String.format("dataflow is empty for groupId [%s]", groupId);
             log.error(message);
@@ -91,7 +91,7 @@ public class StartupSortListener implements SortOperateListener {
         FlinkInfo flinkInfo = new FlinkInfo();
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
         flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
 
@@ -113,7 +113,7 @@ public class StartupSortListener implements SortOperateListener {
             return ListenerResult.fail(message + e.getMessage());
         }
 
-        saveInfo(groupId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), extList);
+        saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList);
         flinkOperation.pollJobStatus(flinkInfo);
         return ListenerResult.success();
     }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index b64633830..e37f91355 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -21,13 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -74,7 +74,7 @@ public class StartupStreamListener implements SortOperateListener {
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isNotEmpty(sortExt)) {
             Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
                     new TypeReference<Map<String, String>>() {
@@ -82,7 +82,7 @@ public class StartupStreamListener implements SortOperateListener {
             kvConf.putAll(result);
         }
 
-        String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+        String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
             String message = String.format("dataflow is empty for groupId [%s] and streamId [%s]", groupId, streamId);
             log.error(message);
@@ -92,7 +92,7 @@ public class StartupStreamListener implements SortOperateListener {
         FlinkInfo flinkInfo = new FlinkInfo();
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
@@ -113,7 +113,7 @@ public class StartupStreamListener implements SortOperateListener {
             return ListenerResult.fail(message + e.getMessage());
         }
 
-        saveInfo(groupId, streamId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), streamExtList);
+        saveInfo(groupId, streamId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), streamExtList);
         flinkOperation.pollJobStatus(flinkInfo);
         return ListenerResult.success();
     }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 16ebfd291..bba9c1e19 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -70,7 +70,7 @@ public class SuspendSortListener implements SortOperateListener {
 
         Map<String, String> kvConf = extList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
                     groupId);
@@ -82,7 +82,7 @@ public class SuspendSortListener implements SortOperateListener {
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s]", groupId);
             return ListenerResult.fail(message);
@@ -90,7 +90,7 @@ public class SuspendSortListener implements SortOperateListener {
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index e5cb9cb7b..ee54b9571 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -21,13 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -72,7 +72,7 @@ public class SuspendStreamListener implements SortOperateListener {
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
-        String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+        String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format(
                     "suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty",
@@ -85,7 +85,7 @@ public class SuspendStreamListener implements SortOperateListener {
                 new TypeReference<Map<String, String>>() {
                 });
         kvConf.putAll(result);
-        String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+        String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
             return ListenerResult.fail(message);
@@ -93,7 +93,7 @@ public class SuspendStreamListener implements SortOperateListener {
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+        String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index e91dacde5..58527432b 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.junit.jupiter.api.Test;
@@ -46,13 +46,13 @@ public class DeleteSortListenerTest {
         groupResourceProcessForm.setGroupInfo(pulsarInfo);
 
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
-        inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+        inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
         inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
         List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
         inlongGroupExtInfos.add(inlongGroupExtInfo1);
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
-        inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+        inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
         String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -60,7 +60,7 @@ public class DeleteSortListenerTest {
         inlongGroupExtInfos.add(inlongGroupExtInfo2);
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
-        inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
+        inlongGroupExtInfo5.setKeyName(InlongConstants.SORT_JOB_ID);
         inlongGroupExtInfo5.setKeyValue("d7e613fb18876f173ec5ba17465fae64");
         inlongGroupExtInfos.add(inlongGroupExtInfo5);
 
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
index 906ecbad7..9134eb704 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
@@ -18,10 +18,10 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.junit.jupiter.api.Test;
 
@@ -45,13 +45,13 @@ public class RestartSortListenerTest {
         groupResourceProcessForm.setGroupInfo(pulsarInfo);
 
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
-        inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+        inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
         inlongGroupExtInfo1.setKeyValue("127.0.0.1:8081");
         List<InlongGroupExtInfo> inlongGroupExtInfoList = new ArrayList<>();
         inlongGroupExtInfoList.add(inlongGroupExtInfo1);
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
-        inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+        inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
         String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -59,12 +59,12 @@ public class RestartSortListenerTest {
         inlongGroupExtInfoList.add(inlongGroupExtInfo2);
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
-        inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
+        inlongGroupExtInfo5.setKeyName(InlongConstants.SORT_JOB_ID);
         inlongGroupExtInfo5.setKeyValue("efdc85a977e72e0d9a99170d78f03ddb");
         inlongGroupExtInfoList.add(inlongGroupExtInfo5);
 
         InlongGroupExtInfo inlongGroupExtInfo6 = new InlongGroupExtInfo();
-        inlongGroupExtInfo6.setKeyName(InlongGroupSettings.DATA_FLOW);
+        inlongGroupExtInfo6.setKeyName(InlongConstants.DATA_FLOW);
         inlongGroupExtInfo6.setKeyValue("{\"streamId\":{\n"
                 + "    \"id\":1,\n"
                 + "    \"source_info\":{\n"
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
index 5f6f97d03..cafec2a56 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
@@ -18,10 +18,10 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.junit.jupiter.api.Test;
 
@@ -48,12 +48,12 @@ public class StartupSortListenerTest {
 
         List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
-        inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+        inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
         inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
         inlongGroupExtInfos.add(inlongGroupExtInfo1);
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
-        inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+        inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
         String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -61,7 +61,7 @@ public class StartupSortListenerTest {
         inlongGroupExtInfos.add(inlongGroupExtInfo2);
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
-        inlongGroupExtInfo5.setKeyName(InlongGroupSettings.DATA_FLOW);
+        inlongGroupExtInfo5.setKeyName(InlongConstants.DATA_FLOW);
         inlongGroupExtInfo5.setKeyValue("{\"streamId\":{\n"
                 + "    \"id\": 1,\n"
                 + "    \"source_info\":\n"
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index 4ea39ba8f..f9f09a2c1 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -18,10 +18,10 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.junit.jupiter.api.Test;
 
@@ -45,13 +45,13 @@ public class SuspendSortListenerTest {
         groupResourceProcessForm.setGroupInfo(pulsarInfo);
 
         InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
-        inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+        inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
         inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
         List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
         inlongGroupExtInfos.add(inlongGroupExtInfo1);
 
         InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
-        inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+        inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, String> sortProperties = new HashMap<>(16);
         String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -59,7 +59,7 @@ public class SuspendSortListenerTest {
         inlongGroupExtInfos.add(inlongGroupExtInfo2);
 
         InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
-        inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
+        inlongGroupExtInfo5.setKeyName(InlongConstants.SORT_JOB_ID);
         inlongGroupExtInfo5.setKeyValue("ea405ab424cfc35ae9be93df8ea87917");
         inlongGroupExtInfos.add(inlongGroupExtInfo5);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
index ef799913b..807988ff3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.cluster;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -45,7 +45,7 @@ public abstract class AbstractClusterOperator implements InlongClusterOperator {
 
         entity.setCreator(operator);
         entity.setCreateTime(new Date());
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         clusterMapper.insert(entity);
 
         return entity.getId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 2f051ee0c..47ba191c0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -28,9 +28,9 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -44,7 +44,6 @@ import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -194,7 +193,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     public Boolean delete(Integer id, String operator) {
         Preconditions.checkNotNull(id, "cluster id cannot be empty");
         InlongClusterEntity entity = clusterMapper.selectById(id);
-        if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
+        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
             LOGGER.error("inlong cluster not found by id={}, or was already deleted", id);
             return false;
         }
@@ -223,7 +222,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
         entity.setCreator(operator);
         entity.setCreateTime(new Date());
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         clusterNodeMapper.insert(entity);
 
         LOGGER.info("success to add inlong cluster node={}", request);
@@ -308,7 +307,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     public Boolean deleteNode(Integer id, String operator) {
         Preconditions.checkNotNull(id, "cluster node id cannot be empty");
         InlongClusterNodeEntity entity = clusterNodeMapper.selectById(id);
-        if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
+        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
             LOGGER.error("inlong cluster node not found by id={}", id);
             return false;
         }
@@ -404,11 +403,11 @@ public class InlongClusterServiceImpl implements InlongClusterService {
                     PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
                     String tenant = pulsarCluster.getTenant();
                     if (StringUtils.isBlank(tenant)) {
-                        tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+                        tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
                     }
 
                     String streamId = streamInfo.getInlongStreamId();
-                    String topic = String.format(InlongGroupSettings.PULSAR_TOPIC_FORMAT,
+                    String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
                             tenant, mqResource, streamInfo.getMqResource());
                     DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
                     topicConfig.setInlongGroupId(groupId + "/" + streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index f1b0c5d1b..e8a7f578b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -22,10 +22,10 @@ import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -40,7 +40,6 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.LoginUserUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -346,7 +345,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         entity.setFilterEnabled(0);
 
         entity.setStatus(ConsumptionStatus.APPROVED.getStatus());
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         entity.setCreator(groupInfo.getCreator());
         entity.setCreateTime(new Date());
 
@@ -357,7 +356,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
             pulsarEntity.setConsumptionId(entity.getId());
             pulsarEntity.setConsumerGroup(consumerGroup);
             pulsarEntity.setInlongGroupId(groupId);
-            pulsarEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            pulsarEntity.setIsDeleted(InlongConstants.UN_DELETED);
             consumptionPulsarMapper.insert(pulsarEntity);
         }
 
@@ -413,8 +412,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
                 PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
                         inlongGroupEntity.getInlongClusterTag(), null, ClusterType.CLS_PULSAR);
                 String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
-                        ? InlongGroupSettings.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
-                info.setTopic(String.format(InlongGroupSettings.PULSAR_TOPIC_FORMAT, tenant,
+                        ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
+                info.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
                         inlongGroupEntity.getMqResource(), info.getTopic()));
             }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
index 2a72625ec..fcb834fd4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
@@ -21,7 +21,7 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import org.apache.inlong.manager.common.enums.DataNodeType;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.node.DataNodePageRequest;
 import org.apache.inlong.manager.common.pojo.node.DataNodeRequest;
@@ -72,7 +72,7 @@ public class DataNodeServiceImpl implements DataNodeService {
         DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
         entity.setCreator(operator);
         entity.setCreateTime(new Date());
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         dataNodeMapper.insert(entity);
 
         LOGGER.debug("success to save data node={}", request);
@@ -138,7 +138,7 @@ public class DataNodeServiceImpl implements DataNodeService {
     public Boolean delete(Integer id, String operator) {
         Preconditions.checkNotNull(id, "data node id cannot be empty");
         DataNodeEntity entity = dataNodeMapper.selectById(id);
-        if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
+        if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
             LOGGER.error("data node not found or was already deleted for id={}", id);
             return false;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 121c25e50..e220b3285 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -23,7 +23,7 @@ import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -486,7 +486,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         streamEntity.setMaxLength(1000);
 
         streamEntity.setStatus(StreamStatus.CONFIG_SUCCESSFUL.getCode());
-        streamEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+        streamEntity.setIsDeleted(InlongConstants.UN_DELETED);
         streamEntity.setCreator(operator);
         streamEntity.setModifier(operator);
         Date now = new Date();
@@ -531,7 +531,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         for (InlongStreamFieldEntity entity : list) {
             entity.setInlongGroupId(groupId);
             entity.setInlongStreamId(streamId);
-            entity.setIsDeleted(GlobalConstants.UN_DELETED);
+            entity.setIsDeleted(InlongConstants.UN_DELETED);
         }
         streamFieldMapper.insertAll(list);
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
index 20cb9ebb3..961459fb5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.pojo.workflow.FilterKey;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApprover;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
@@ -119,7 +119,7 @@ public class WorkflowApproverServiceImpl implements WorkflowApproverService {
         Preconditions.checkEmpty(exist, "already exit the same config");
 
         WorkflowApproverEntity entity = CommonBeanUtils.copyProperties(approver, WorkflowApproverEntity::new);
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         int success = this.workflowApproverMapper.insert(entity);
         Preconditions.checkTrue(success == 1, "add failed");
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index e4aed9328..ba736d65f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.group;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
@@ -54,7 +54,7 @@ public abstract class AbstractGroupOperator implements InlongGroupOperator {
 
         // after saving, the status is set to [GROUP_WAIT_SUBMIT]
         entity.setStatus(GroupStatus.TO_BE_SUBMIT.getCode());
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         entity.setCreator(operator);
         entity.setCreateTime(new Date());
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
index 8b27040d2..4d9b700a7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
@@ -110,7 +110,7 @@ public class InlongPulsarOperator extends AbstractGroupOperator {
         // TODO add cache for cluster info
         // pulsar topic corresponds to the inlong stream one-to-one
         // topicInfo.setDsTopicList(streamService.getTopicList(groupInfo.getInlongGroupId()));
-        // commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL);
+        // commonOperateService.getSpecifiedParam(InlongConstants.TUBE_MASTER_URL);
         // groupInfo.setTenant();
         // groupInfo.setAdminUrl();
         // groupInfo.setServiceUrl();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index e50e14f37..460dbf928 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -90,7 +90,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
             }
             String namespace = groupInfo.getMqResource();
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index 146669fad..bdc45b613 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -107,7 +107,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
             // create pulsar tenant
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
             }
             pulsarOperator.createTenant(pulsarAdmin, tenant);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 5b7e63725..3a506aa4c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
@@ -28,7 +29,6 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.mq.util.PulsarOperator;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
@@ -74,7 +74,7 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
             PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
             }
 
             try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
index 8a6c45d18..d9f33b2a9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -46,7 +46,7 @@ public class PulsarEventSelector implements EventSelector {
         MQType mqType = MQType.forType(groupInfo.getMqType());
         if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-            boolean enable = GlobalConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
             if (enable) {
                 log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
                 return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
index 029958a92..2e0450479 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.resource;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -66,7 +66,7 @@ public class SinkResourceListener implements SinkOperateListener {
         }
         List<SinkInfo> configList = sinkMapper.selectAllConfig(groupId, streamIdList);
         List<SinkInfo> needCreateList = configList.stream()
-                .filter(sinkInfo -> GlobalConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
+                .filter(sinkInfo -> InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
                 .collect(Collectors.toList());
 
         if (CollectionUtils.isEmpty(needCreateList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
index 0a28bd9c4..cde624ed7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.resource;
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -63,7 +63,7 @@ public class StreamSinkResourceListener implements SinkOperateListener {
 
         List<SinkInfo> sinkInfos = sinkEntityMapper.selectAllConfig(groupId, Lists.newArrayList(streamId));
         List<SinkInfo> needCreateResources = sinkInfos.stream()
-                .filter(sinkInfo -> GlobalConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
+                .filter(sinkInfo -> InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
                 .collect(Collectors.toList());
 
         if (CollectionUtils.isEmpty(needCreateResources)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
index 91fd00b36..f055d1344 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.ck;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -67,7 +67,7 @@ public class ClickHouseResourceOperator implements SinkResourceOperator {
         if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
             LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
             return;
-        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
             LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
             return;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
index f5cc4bf0f..fb6c4883c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.resource.es;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -68,7 +68,7 @@ public class ElasticsearchResourceOperator implements SinkResourceOperator {
         if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
             LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
             return;
-        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
             LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
             return;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java
index f6c66dc1b..4346fff41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.hbase;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -73,7 +73,7 @@ public class HBaseResourceOperator implements SinkResourceOperator {
         if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
             LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
             return;
-        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
             LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
             return;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
index 1fb67bf45..40b4349ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.hive;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -70,7 +70,7 @@ public class HiveResourceOperator implements SinkResourceOperator {
         if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
             LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
             return;
-        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
             LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
             return;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
index bb1857eb8..2beafec60 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.iceberg;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -70,7 +70,7 @@ public class IcebergResourceOperator implements SinkResourceOperator {
         if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
             LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
             return;
-        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
             LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
             return;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
index d4ab18bc0..74e00f3ba 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.postgres;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -69,7 +69,7 @@ public class PostgresResourceOperator implements SinkResourceOperator {
         if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
             LOGGER.warn("postgres resource [" + sinkInfo.getId() + "] already success, skip to create");
             return;
-        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
             LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
             return;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 3cc13ff45..f422e553b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -81,7 +81,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
     public Integer saveOpt(SinkRequest request, String operator) {
         StreamSinkEntity entity = CommonBeanUtils.copyProperties(request, StreamSinkEntity::new);
         entity.setStatus(SinkStatus.NEW.getCode());
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         entity.setCreator(operator);
         entity.setModifier(operator);
         Date now = new Date();
@@ -163,7 +163,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
             fieldEntity.setInlongStreamId(streamId);
             fieldEntity.setSinkType(sinkType);
             fieldEntity.setSinkId(sinkId);
-            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
             entityList.add(fieldEntity);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 13091daf3..4d396b614 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
@@ -252,7 +252,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         entity.setPreviousStatus(entity.getStatus());
-        entity.setStatus(GlobalConstants.DELETED_STATUS);
+        entity.setStatus(InlongConstants.DELETED_STATUS);
         entity.setIsDeleted(id);
         entity.setModifier(operator);
         entity.setModifyTime(new Date());
@@ -279,7 +279,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             entityList.forEach(entity -> {
                 Integer id = entity.getId();
                 entity.setPreviousStatus(entity.getStatus());
-                entity.setStatus(GlobalConstants.DELETED_STATUS);
+                entity.setStatus(InlongConstants.DELETED_STATUS);
                 entity.setIsDeleted(id);
                 entity.setModifier(operator);
                 entity.setModifyTime(now);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index 1f5c204d4..f7d397e9f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -24,7 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -149,7 +149,7 @@ public class IcebergSinkOperator extends AbstractSinkOperator {
             fieldEntity.setInlongStreamId(streamId);
             fieldEntity.setSinkType(sinkType);
             fieldEntity.setSinkId(sinkId);
-            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
             entityList.add(fieldEntity);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
index 19cf294d8..0369f6a2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
@@ -21,13 +21,13 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.DataFlowUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -93,7 +93,7 @@ public class CreateSortConfigListener implements SortOperateListener {
             // String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
             InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
             extInfo.setInlongGroupId(groupId);
-            extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+            extInfo.setKeyName(InlongConstants.DATA_FLOW);
             // extInfo.setKeyValue(dataFlows);
             if (groupInfo.getExtList() == null) {
                 groupInfo.setExtList(Lists.newArrayList());
@@ -107,7 +107,7 @@ public class CreateSortConfigListener implements SortOperateListener {
     }
 
     private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
-        groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+        groupInfo.getExtList().removeIf(ext -> InlongConstants.DATA_FLOW.equals(ext.getKeyName()));
         groupInfo.getExtList().add(extInfo);
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 5276fd5b8..1aa87a37d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
@@ -38,7 +39,6 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
@@ -96,7 +96,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
 
         InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
         extInfo.setInlongGroupId(groupId);
-        extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+        extInfo.setKeyName(InlongConstants.DATA_FLOW);
         extInfo.setKeyValue(dataFlows);
         if (groupInfo.getExtList() == null) {
             groupInfo.setExtList(Lists.newArrayList());
@@ -144,7 +144,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
         String adminUrl = pulsarCluster.getAdminUrl();
         String serviceUrl = pulsarCluster.getUrl();
-        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongGroupSettings.DEFAULT_PULSAR_TENANT
+        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongConstants.DEFAULT_PULSAR_TENANT
                 : pulsarCluster.getTenant();
         streamInfoList.forEach(streamInfo -> {
             PulsarSource pulsarSource = new PulsarSource();
@@ -194,7 +194,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
     }
 
     private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
-        groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+        groupInfo.getExtList().removeIf(ext -> InlongConstants.DATA_FLOW.equals(ext.getKeyName()));
         groupInfo.getExtList().add(extInfo);
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 22723787c..8c144b023 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -37,7 +37,7 @@ import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
@@ -105,7 +105,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
             InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
             extInfo.setInlongGroupId(groupId);
             extInfo.setInlongStreamId(streamId);
-            String keyName = InlongGroupSettings.DATA_FLOW;
+            String keyName = InlongConstants.DATA_FLOW;
             extInfo.setKeyName(keyName);
             extInfo.setKeyValue(dataFlows);
             if (streamInfo.getExtList() == null) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
index 6308b70cd..2311e2834 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sort.light;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -27,7 +28,6 @@ import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
 import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
@@ -86,7 +86,7 @@ public class LightGroupSortListener implements SortOperateListener {
             String dataFlows = OBJECT_MAPPER.writeValueAsString(configInfo);
             InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
             extInfo.setInlongGroupId(groupId);
-            extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+            extInfo.setKeyName(InlongConstants.DATA_FLOW);
             extInfo.setKeyValue(dataFlows);
             if (groupInfo.getExtList() == null) {
                 groupInfo.setExtList(Lists.newArrayList());
@@ -145,7 +145,7 @@ public class LightGroupSortListener implements SortOperateListener {
     }
 
     private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
-        groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+        groupInfo.getExtList().removeIf(ext -> InlongConstants.DATA_FLOW.equals(ext.getKeyName()));
         groupInfo.getExtList().add(extInfo);
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
index 51767692b..6764219b9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
@@ -69,7 +69,7 @@ public class DataFlowUtils {
         FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
 
         // Get source info
-        String masterAddress = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL);
+        String masterAddress = commonOperateService.getSpecifiedParam(InlongConstants.TUBE_MASTER_URL);
         PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
         org.apache.inlong.sort.protocol.source.SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster,
                 masterAddress, clusterBean,
@@ -86,7 +86,7 @@ public class DataFlowUtils {
         if (MapUtils.isNotEmpty(streamSink.getProperties())) {
             properties.putAll(streamSink.getProperties());
         }
-        properties.put(InlongGroupSettings.DATA_FLOW_GROUP_ID_KEY, groupId);
+        properties.put(InlongConstants.DATA_FLOW_GROUP_ID_KEY, groupId);
 
         return new DataFlowInfo(streamSink.getId(), sourceInfo, transInfo, sinkInfo, properties);
     }*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 94a72b35d..8c1281d29 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.source;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -103,7 +103,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
         Date now = new Date();
         entity.setCreateTime(now);
         entity.setModifyTime(now);
-        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setIsDeleted(InlongConstants.UN_DELETED);
         // get the ext params
         setTargetEntity(request, entity);
         sourceMapper.insert(entity);
@@ -236,7 +236,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
             fieldEntity.setInlongStreamId(streamId);
             fieldEntity.setSourceId(sourceId);
             fieldEntity.setSourceType(sourceType);
-            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
             entityList.add(fieldEntity);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 9865d9660..0badb762f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,7 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
@@ -86,7 +86,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         Date now = new Date();
         transformEntity.setCreateTime(now);
         transformEntity.setModifyTime(now);
-        transformEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+        transformEntity.setIsDeleted(InlongConstants.UN_DELETED);
         transformMapper.insert(transformEntity);
         saveFieldOpt(transformEntity, transformRequest.getFieldList());
         return transformEntity.getId();
@@ -221,7 +221,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
             fieldEntity.setRankNum(fieldInfo.getId());
             fieldEntity.setTransformId(transformId);
             fieldEntity.setTransformType(transformType);
-            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
             entityList.add(fieldEntity);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 2baff6770..544e1b03b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.workflow.consumption.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.enums.MQType;
@@ -28,7 +29,6 @@ import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -127,7 +127,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             PulsarTopicBean topicMessage = new PulsarTopicBean();
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
             }
             topicMessage.setTenant(tenant);
             topicMessage.setNamespace(mqResource);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 05c794da6..0b5216654 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.pojo.cluster.dataproxy.DataProxyClusterR
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterRequest;
 import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -210,7 +209,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         Assertions.assertNotNull(nodeId1);
 
         Integer port2 = 46801;
-        Integer nodeId2 = this.saveClusterNode(id, InlongGroupSettings.CLUSTER_DATA_PROXY, ip, port2);
+        Integer nodeId2 = this.saveClusterNode(id, ClusterType.CLS_DATA_PROXY, ip, port2);
         Assertions.assertNotNull(nodeId2);
 
         // Get the data proxy cluster ip list, the first port should is p1, second port is p2
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
index 281d20b22..b51002acc 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -64,7 +64,7 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setUsername(ckUsername);
         sinkInfo.setDbName(ckDatabaseName);
         sinkInfo.setTableName(ckTableName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         sinkInfo.setId((int) (Math.random() * 100000 + 1));
         return sinkService.save(sinkInfo, globalOperator);
     }
@@ -92,7 +92,7 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         ClickHouseSink ckSink = (ClickHouseSink) response;
-        ckSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        ckSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         SinkRequest request = ckSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
index 55df215e1..e16f34a67 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -61,7 +61,7 @@ public class DLCIcebergStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkType(SinkType.SINK_DLCICEBERG);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkField sinkField = new SinkField();
         sinkField.setFieldName(fieldName);
         sinkField.setFieldType(fieldType);
@@ -98,7 +98,7 @@ public class DLCIcebergStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         DLCIcebergSink dlcIcebergSink = (DLCIcebergSink) response;
-        dlcIcebergSink.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        dlcIcebergSink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkRequest request = dlcIcebergSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java
index f5bc530ea..3072d4a36 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -63,7 +63,7 @@ public class ElasticsearchStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setVersion(7);
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         return sinkService.save(sinkInfo, globalOperator);
     }
 
@@ -90,7 +90,7 @@ public class ElasticsearchStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         ElasticsearchSink elasticsearchSink = (ElasticsearchSink) response;
-        elasticsearchSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        elasticsearchSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         SinkRequest request = elasticsearchSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
index 69f1abf3b..677880367 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -68,7 +68,7 @@ public class GreenplumStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setPrimaryKey("name,age");
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkField sinkField = new SinkField();
         sinkField.setFieldName(fieldName);
         sinkField.setFieldType(fieldType);
@@ -102,7 +102,7 @@ public class GreenplumStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         GreenplumSink greenplumSink = (GreenplumSink) response;
-        greenplumSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        greenplumSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         SinkRequest request = greenplumSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java
index cff4a028a..cee710ff5 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -55,7 +55,7 @@ public class HBaseStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkType(SinkType.SINK_HBASE);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         sinkInfo.setSinkName(sinkName);
         sinkInfo.setTableName(tableName);
         sinkInfo.setNamespace(nameSpace);
@@ -87,7 +87,7 @@ public class HBaseStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         HBaseSink hbaseSink = (HBaseSink) response;
-        hbaseSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        hbaseSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         SinkRequest request = hbaseSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
index 86a44580a..85cb8f8bb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -61,7 +61,7 @@ public class HdfsStreamSinkServiceTest extends ServiceBaseTest {
         hdfsSinkRequest.setInlongGroupId(globalGroupId);
         hdfsSinkRequest.setInlongStreamId(globalStreamId);
         hdfsSinkRequest.setSinkType(SinkType.SINK_HDFS);
-        hdfsSinkRequest.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        hdfsSinkRequest.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         hdfsSinkRequest.setSinkName(sinkName);
         hdfsSinkRequest.setFileFormat(fileFormat);
         hdfsSinkRequest.setDataPath(dataPath);
@@ -101,7 +101,7 @@ public class HdfsStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         HdfsSink hdfsSink = (HdfsSink) response;
-        hdfsSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        hdfsSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
 
         HdfsSinkRequest request = CommonBeanUtils.copyProperties(hdfsSink, HdfsSinkRequest::new);
         boolean result = sinkService.update(request, globalOperator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
index 07e4686e3..101750e30 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
@@ -55,7 +55,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkType(SinkType.SINK_HIVE);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         sinkInfo.setSinkName(sinkName);
         return sinkService.save(sinkInfo, globalOperator);
     }
@@ -86,7 +86,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         HiveSink hiveResponse = (HiveSink) response;
-        hiveResponse.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        hiveResponse.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         HiveSinkRequest request = CommonBeanUtils.copyProperties(hiveResponse, HiveSinkRequest::new);
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
index c6fa760aa..b16a14010 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -54,7 +54,7 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkType(SinkType.SINK_ICEBERG);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
         sinkInfo.setSinkName(sinkName);
         sinkInfo.setId((int) (Math.random() * 100000 + 1));
@@ -84,7 +84,7 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         IcebergSink icebergSink = (IcebergSink) response;
-        icebergSink.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        icebergSink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkRequest request = icebergSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
index 4dda378d3..031a52745 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -58,7 +58,7 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setSerializationType(serializationType);
         sinkInfo.setBootstrapServers(bootstrapServers);
         sinkInfo.setTopicName(topicName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         return sinkService.save(sinkInfo, GLOBAL_OPERATOR);
     }
 
@@ -85,7 +85,7 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
 
         KafkaSink kafkaSink = (KafkaSink) response;
-        kafkaSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        kafkaSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         SinkRequest request = kafkaSink.genSinkRequest();
         boolean result = sinkService.update(request, GLOBAL_OPERATOR);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java
index 9270be11c..08a497819 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -68,7 +68,7 @@ public class MysqlStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setPrimaryKey("name,age");
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkField sinkField = new SinkField();
         sinkField.setFieldName(fieldName);
         sinkField.setFieldType(fieldType);
@@ -102,7 +102,7 @@ public class MysqlStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         MySQLSink mysqlSink = (MySQLSink) response;
-        mysqlSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        mysqlSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         MySQLSinkRequest request = CommonBeanUtils.copyProperties(mysqlSink,
                 MySQLSinkRequest::new);
         boolean result = sinkService.update(request, globalOperator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java
index ed2075ee3..3b806544e 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -68,7 +68,7 @@ public class OracleStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setPrimaryKey("name,age");
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkField sinkField = new SinkField();
         sinkField.setFieldName(fieldName);
         sinkField.setFieldType(fieldType);
@@ -102,7 +102,7 @@ public class OracleStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         OracleSink oracleSink = (OracleSink) response;
-        oracleSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        oracleSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
 
         OracleSinkRequest request = CommonBeanUtils.copyProperties(oracleSink,
                 OracleSinkRequest::new);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
index fed6f8c00..910656d78 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.core.sink;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -72,7 +72,7 @@ public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setPrimaryKey("name,age");
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         return sinkService.save(sinkInfo, globalOperator);
     }
 
@@ -99,7 +99,7 @@ public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         PostgresSink postgresSink = (PostgresSink) response;
-        postgresSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        postgresSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         SinkRequest request = postgresSink.genSinkRequest();
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
index 63ee46745..8775e25fe 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
@@ -63,7 +63,7 @@ public class SqlServerStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setPrimaryKey("name,age");
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         return sinkService.save(sinkInfo, globalOperator);
     }
 
@@ -90,7 +90,7 @@ public class SqlServerStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         SqlServerSink sqlServerSink = (SqlServerSink) response;
-        sqlServerSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        sqlServerSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
 
         SqlServerSinkRequest request = CommonBeanUtils.copyProperties(sqlServerSink,
                 SqlServerSinkRequest::new);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java
index 002c7d3cc..e76699a2d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -69,7 +69,7 @@ class TDSQLPostgreSQLStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setPrimaryKey("name,age");
 
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         SinkField sinkField = new SinkField();
         sinkField.setFieldName(fieldName);
         sinkField.setFieldType(fieldType);
@@ -103,7 +103,7 @@ class TDSQLPostgreSQLStreamSinkServiceTest extends ServiceBaseTest {
         Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
 
         TDSQLPostgreSQLSink tdsqlPostgreSQLSink = (TDSQLPostgreSQLSink) response;
-        tdsqlPostgreSQLSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+        tdsqlPostgreSQLSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
 
         TDSQLPostgreSQLSinkRequest request = CommonBeanUtils.copyProperties(tdsqlPostgreSQLSink,
                 TDSQLPostgreSQLSinkRequest::new);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java
index 65d9651aa..a99f4a1e4 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java
@@ -17,76 +17,33 @@
 
 package org.apache.inlong.manager.service.mq.util;
 
-import com.google.common.collect.Lists;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 import org.springframework.util.ReflectionUtils;
 
 import java.lang.reflect.Field;
-import java.util.ArrayList;
 
 /**
- * Test class for pulsar utils.
+ * Test class for Pulsar utils.
  */
 public class PulsarUtilsTest {
 
-    // There will be concurrency problems in the overall operation,This method temporarily fails the test
-    // @Test
+    @Test
     public void testGetPulsarAdmin() {
-        InlongGroupExtInfo groupExtInfo1 = new InlongGroupExtInfo();
-        groupExtInfo1.setId(1);
-        groupExtInfo1.setInlongGroupId("group1");
-        groupExtInfo1.setKeyName(InlongGroupSettings.PULSAR_ADMIN_URL);
-        groupExtInfo1.setKeyValue("http://127.0.0.1:8080");
-
-        InlongGroupExtInfo groupExtInfo2 = new InlongGroupExtInfo();
-        groupExtInfo2.setId(2);
-        groupExtInfo2.setInlongGroupId("group1");
-        groupExtInfo2.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION);
-        groupExtInfo2.setKeyValue("QWEASDZXC");
-        ArrayList<InlongGroupExtInfo> groupExtInfoList = Lists.newArrayList(groupExtInfo1, groupExtInfo2);
-        InlongPulsarInfo groupInfo = new InlongPulsarInfo();
-        groupInfo.setExtList(groupExtInfoList);
-
         final String defaultServiceUrl = "http://127.0.0.1:10080";
         try {
             PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
-            Assertions.assertEquals("http://127.0.0.1:8080", admin.getServiceUrl());
+            Assertions.assertEquals(defaultServiceUrl, admin.getServiceUrl());
             Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
             assert auth != null;
             auth.setAccessible(true);
             Authentication authentication = (Authentication) auth.get(admin);
             Assertions.assertNotNull(authentication);
-
-            InlongGroupExtInfo groupExtInfo3 = new InlongGroupExtInfo();
-            groupExtInfo3.setId(3);
-            groupExtInfo3.setInlongGroupId("group1");
-            groupExtInfo3.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION_TYPE);
-            groupExtInfo3.setKeyValue("token1");
-            groupExtInfoList.add(groupExtInfo3);
-            try {
-                admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
-            } catch (Exception e) {
-                if (e instanceof IllegalArgumentException) {
-                    Assertions.assertTrue(e.getMessage().contains("illegal authentication type"));
-                }
-            }
-
-            groupExtInfoList = new ArrayList<>();
-            groupInfo.setExtList(groupExtInfoList);
-            admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
-            Assertions.assertEquals("http://127.0.0.1:10080", admin.getServiceUrl());
-            auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
-            assert auth != null;
-            auth.setAccessible(true);
-            authentication = (Authentication) auth.get(admin);
             Assertions.assertTrue(authentication instanceof AuthenticationDisabled);
         } catch (PulsarClientException | IllegalAccessException e) {
             Assertions.fail();