You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/27 06:07:36 UTC
[inlong] branch master updated: [INLONG-4780][Manager] Combine some constant classes into one (#4783)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e42cb6209 [INLONG-4780][Manager] Combine some constant classes into one (#4783)
e42cb6209 is described below
commit e42cb6209b860004a508c07fa0999185eed697d0
Author: healzhou <he...@gmail.com>
AuthorDate: Mon Jun 27 14:07:31 2022 +0800
[INLONG-4780][Manager] Combine some constant classes into one (#4783)
---
.../apache/inlong/manager/client/BaseExample.java | 4 +-
.../apache/inlong/manager/client/ut/BaseTest.java | 4 +-
.../client/api/util/InlongGroupTransfer.java | 43 +++++++++---------
.../InlongConstants.java} | 48 ++++++++------------
.../manager/common/enums/GlobalConstants.java | 39 -----------------
.../plugin/listener/DeleteSortListener.java | 8 ++--
.../plugin/listener/DeleteStreamListener.java | 8 ++--
.../plugin/listener/RestartSortListener.java | 10 ++---
.../plugin/listener/RestartStreamListener.java | 10 ++---
.../plugin/listener/StartupSortListener.java | 10 ++---
.../plugin/listener/StartupStreamListener.java | 10 ++---
.../plugin/listener/SuspendSortListener.java | 8 ++--
.../plugin/listener/SuspendStreamListener.java | 8 ++--
.../plugin/listener/DeleteSortListenerTest.java | 8 ++--
.../plugin/listener/RestartSortListenerTest.java | 10 ++---
.../plugin/listener/StartupSortListenerTest.java | 8 ++--
.../plugin/listener/SuspendSortListenerTest.java | 8 ++--
.../service/cluster/AbstractClusterOperator.java | 4 +-
.../service/cluster/InlongClusterServiceImpl.java | 13 +++---
.../service/core/impl/ConsumptionServiceImpl.java | 11 +++--
.../service/core/impl/DataNodeServiceImpl.java | 6 +--
.../service/core/impl/InlongStreamServiceImpl.java | 6 +--
.../core/impl/WorkflowApproverServiceImpl.java | 4 +-
.../service/group/AbstractGroupOperator.java | 4 +-
.../service/group/InlongPulsarOperator.java | 2 +-
.../service/mq/CreatePulsarGroupTaskListener.java | 4 +-
.../mq/CreatePulsarResourceTaskListener.java | 4 +-
.../service/mq/CreatePulsarTopicTaskListener.java | 4 +-
.../manager/service/mq/PulsarEventSelector.java | 4 +-
.../service/resource/SinkResourceListener.java | 4 +-
.../resource/StreamSinkResourceListener.java | 4 +-
.../resource/ck/ClickHouseResourceOperator.java | 4 +-
.../resource/es/ElasticsearchResourceOperator.java | 4 +-
.../resource/hbase/HBaseResourceOperator.java | 4 +-
.../resource/hive/HiveResourceOperator.java | 4 +-
.../resource/iceberg/IcebergResourceOperator.java | 4 +-
.../postgres/PostgresResourceOperator.java | 4 +-
.../manager/service/sink/AbstractSinkOperator.java | 6 +--
.../service/sink/StreamSinkServiceImpl.java | 6 +--
.../service/sink/iceberg/IcebergSinkOperator.java | 4 +-
.../service/sort/CreateSortConfigListener.java | 6 +--
.../service/sort/CreateSortConfigListenerV2.java | 8 ++--
.../sort/CreateStreamSortConfigListener.java | 4 +-
.../service/sort/light/LightGroupSortListener.java | 6 +--
.../manager/service/sort/util/DataFlowUtils.java | 4 +-
.../service/source/AbstractSourceOperator.java | 6 +--
.../transform/StreamTransformServiceImpl.java | 6 +--
.../ConsumptionCompleteProcessListener.java | 4 +-
.../service/cluster/InlongClusterServiceTest.java | 3 +-
.../core/sink/ClickHouseStreamSinkServiceTest.java | 6 +--
.../core/sink/DLCIcebergStreamSinkServiceTest.java | 6 +--
.../sink/ElasticsearchStreamSinkServiceTest.java | 6 +--
.../core/sink/GreenplumStreamSinkServiceTest.java | 6 +--
.../core/sink/HBaseStreamSinkServiceTest.java | 6 +--
.../core/sink/HdfsStreamSinkServiceTest.java | 6 +--
.../core/sink/HiveStreamSinkServiceTest.java | 6 +--
.../core/sink/IcebergStreamSinkServiceTest.java | 6 +--
.../core/sink/KafkaStreamSinkServiceTest.java | 6 +--
.../core/sink/MysqlStreamSinkServiceTest.java | 6 +--
.../core/sink/OracleStreamSinkServiceTest.java | 6 +--
.../core/sink/PostgresStreamSinkServiceTest.java | 6 +--
.../core/sink/SqlServerStreamSinkServiceTest.java | 6 +--
.../sink/TDSQLPostgreSQLStreamSinkServiceTest.java | 6 +--
.../manager/service/mq/util/PulsarUtilsTest.java | 51 ++--------------------
64 files changed, 222 insertions(+), 318 deletions(-)
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
index b17f78a70..268001f1d 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.FileFormat;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -111,7 +111,7 @@ public class BaseExample {
streamInfo.setDataEncoding(StandardCharsets.UTF_8.toString());
streamInfo.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
// if you need strictly order for data, set to 1
- streamInfo.setSyncSend(GlobalConstants.SYNC_SEND);
+ streamInfo.setSyncSend(InlongConstants.SYNC_SEND);
streamInfo.setMqResource(this.getTopic());
return streamInfo;
}
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 6f6899f40..a5f3182f8 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.FileFormat;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -142,7 +142,7 @@ public class BaseTest {
streamInfo.setDataEncoding(StandardCharsets.UTF_8.toString());
streamInfo.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
// if you need strictly order for data, set to 1
- streamInfo.setSyncSend(GlobalConstants.SYNC_SEND);
+ streamInfo.setSyncSend(InlongConstants.SYNC_SEND);
streamInfo.setMqResource(TOPIC);
return streamInfo;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 26a604e1f..0a247d259 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -25,13 +25,13 @@ import org.apache.inlong.manager.common.auth.Authentication;
import org.apache.inlong.manager.common.auth.Authentication.AuthType;
import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.auth.TokenAuthentication;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sort.BaseSortConf;
import org.apache.inlong.manager.common.pojo.sort.BaseSortConf.SortType;
import org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.common.pojo.sort.UserDefinedSortConf;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.AssertUtils;
import java.util.ArrayList;
@@ -47,34 +47,33 @@ public class InlongGroupTransfer {
/**
* Create inlong group info from group config.
*/
- public static InlongGroupInfo createGroupInfo(InlongGroupInfo originGroupInfo, BaseSortConf sortConf) {
- AssertUtils.notNull(originGroupInfo, "Inlong group info cannot be null");
- AssertUtils.hasLength(originGroupInfo.getInlongGroupId(), "groupId cannot be empty");
- originGroupInfo.setExtList(Lists.newArrayList());
+ public static InlongGroupInfo createGroupInfo(InlongGroupInfo groupInfo, BaseSortConf sortConf) {
+ AssertUtils.notNull(groupInfo, "Inlong group info cannot be null");
+ AssertUtils.hasLength(groupInfo.getInlongGroupId(), "groupId cannot be empty");
+ groupInfo.setExtList(Lists.newArrayList());
// set authentication into group ext list
List<InlongGroupExtInfo> extInfos = new ArrayList<>();
- if (originGroupInfo.getAuthentication() != null) {
- Authentication authentication = originGroupInfo.getAuthentication();
+ if (groupInfo.getAuthentication() != null) {
+ Authentication authentication = groupInfo.getAuthentication();
AuthType authType = authentication.getAuthType();
AssertUtils.isTrue(authType == AuthType.TOKEN,
String.format("Unsupported authentication:%s for pulsar", authType.name()));
TokenAuthentication tokenAuthentication = (TokenAuthentication) authentication;
InlongGroupExtInfo authTypeExt = new InlongGroupExtInfo();
- authTypeExt.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION_TYPE);
+ authTypeExt.setKeyName(InlongConstants.PULSAR_AUTHENTICATION_TYPE);
authTypeExt.setKeyValue(tokenAuthentication.getAuthType().toString());
extInfos.add(authTypeExt);
InlongGroupExtInfo authValue = new InlongGroupExtInfo();
- authValue.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION);
+ authValue.setKeyName(InlongConstants.PULSAR_AUTHENTICATION);
authValue.setKeyValue(tokenAuthentication.getToken());
extInfos.add(authValue);
- originGroupInfo.getExtList().addAll(extInfos);
+ groupInfo.getExtList().addAll(extInfos);
}
if (sortConf == null) {
- throw new IllegalArgumentException(
- String.format("sort config cannot be empty for group=", originGroupInfo.getInlongGroupId()));
+ throw new IllegalArgumentException("sort config cannot be empty for group=" + groupInfo.getInlongGroupId());
}
// set the sort config into ext list
SortType sortType = sortConf.getType();
@@ -90,8 +89,8 @@ public class InlongGroupTransfer {
sortExtInfos = new ArrayList<>();
}
- originGroupInfo.getExtList().addAll(sortExtInfos);
- return originGroupInfo;
+ groupInfo.getExtList().addAll(sortExtInfos);
+ return groupInfo;
}
/**
@@ -100,7 +99,7 @@ public class InlongGroupTransfer {
public static List<InlongGroupExtInfo> createFlinkExtInfo(FlinkSortConf flinkSortConf) {
List<InlongGroupExtInfo> extInfos = new ArrayList<>();
InlongGroupExtInfo sortType = new InlongGroupExtInfo();
- sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+ sortType.setKeyName(InlongConstants.SORT_TYPE);
sortType.setKeyValue(SortType.FLINK.getType());
extInfos.add(sortType);
if (flinkSortConf.getAuthentication() != null) {
@@ -110,23 +109,23 @@ public class InlongGroupTransfer {
String.format("Unsupported authentication:%s for flink", authType.name()));
final SecretTokenAuthentication secretTokenAuthentication = (SecretTokenAuthentication) authentication;
InlongGroupExtInfo authTypeExt = new InlongGroupExtInfo();
- authTypeExt.setKeyName(InlongGroupSettings.SORT_AUTHENTICATION_TYPE);
+ authTypeExt.setKeyName(InlongConstants.SORT_AUTHENTICATION_TYPE);
authTypeExt.setKeyValue(authType.toString());
extInfos.add(authTypeExt);
InlongGroupExtInfo authValue = new InlongGroupExtInfo();
- authValue.setKeyName(InlongGroupSettings.SORT_AUTHENTICATION);
+ authValue.setKeyName(InlongConstants.SORT_AUTHENTICATION);
authValue.setKeyValue(secretTokenAuthentication.toString());
extInfos.add(authValue);
}
if (StringUtils.isNotEmpty(flinkSortConf.getServiceUrl())) {
InlongGroupExtInfo flinkUrl = new InlongGroupExtInfo();
- flinkUrl.setKeyName(InlongGroupSettings.SORT_URL);
+ flinkUrl.setKeyName(InlongConstants.SORT_URL);
flinkUrl.setKeyValue(flinkSortConf.getServiceUrl());
extInfos.add(flinkUrl);
}
if (MapUtils.isNotEmpty(flinkSortConf.getProperties())) {
InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
- flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ flinkProperties.setKeyName(InlongConstants.SORT_PROPERTIES);
try {
flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(flinkSortConf.getProperties()));
} catch (Exception e) {
@@ -143,16 +142,16 @@ public class InlongGroupTransfer {
public static List<InlongGroupExtInfo> createUserDefinedSortExtInfo(UserDefinedSortConf userDefinedSortConf) {
List<InlongGroupExtInfo> extInfos = new ArrayList<>();
InlongGroupExtInfo sortType = new InlongGroupExtInfo();
- sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+ sortType.setKeyName(InlongConstants.SORT_TYPE);
sortType.setKeyValue(SortType.USER_DEFINED.getType());
extInfos.add(sortType);
InlongGroupExtInfo sortName = new InlongGroupExtInfo();
- sortName.setKeyName(InlongGroupSettings.SORT_NAME);
+ sortName.setKeyName(InlongConstants.SORT_NAME);
sortName.setKeyValue(userDefinedSortConf.getSortName());
extInfos.add(sortName);
if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
- flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ flinkProperties.setKeyName(InlongConstants.SORT_PROPERTIES);
try {
flinkProperties.setKeyValue(OBJECT_MAPPER.writeValueAsString(userDefinedSortConf.getProperties()));
} catch (Exception e) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
similarity index 68%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index d8fb440e6..f1c336d49 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -15,29 +15,30 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.settings;
+package org.apache.inlong.manager.common.consts;
-public class InlongGroupSettings {
+/**
+ * Global constant for the Inlong system.
+ */
+public class InlongConstants {
- /**
- * config of group
- */
- public static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
+ public static final Integer UN_DELETED = 0;
- public static final String DATA_FLOW = "dataFlow";
+ public static final Integer IS_DELETED = 1;
- /**
- * Refer to{@link org.apache.inlong.manager.common.enums.GroupMode#getMode}
- */
- public static final String GROUP_MODE = "group.mode";
+ public static final Integer DELETED_STATUS = 10;
- /**
- * config of pulsar
- */
- public static final String PULSAR_ADMIN_URL = "pulsar_adminUrl";
+ public static final Integer DISABLE_CREATE_RESOURCE = 0;
+
+ public static final Integer ENABLE_CREATE_RESOURCE = 1;
+
+ public static final Integer SYNC_SEND = 1;
- public static final String PULSAR_SERVICE_URL = "pulsar_serviceUrl";
+ public static final Integer UN_SYNC_SEND = 0;
+ /**
+ * Pulsar config
+ */
public static final String PULSAR_AUTHENTICATION = "pulsar.authentication";
public static final String PULSAR_AUTHENTICATION_TYPE = "pulsar.authentication.type";
@@ -52,22 +53,11 @@ public class InlongGroupSettings {
public static final String PULSAR_TOPIC_FORMAT = "persistent://%s/%s/%s";
/**
- * config of tube mq
+ * Sort config
*/
- public static final String TUBE_MANAGER_URL = "tube.manager.url";
-
- public static final String TUBE_MASTER_URL = "tube.master.url";
- public static final String TUBE_CLUSTER_ID = "tube.cluster.id";
-
- /**
- * config of dataproxy
- */
- public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
+ public static final String DATA_FLOW = "dataFlow";
- /**
- * config of sort
- */
public static final String SORT_JOB_ID = "sort.job.id";
public static final String SORT_TYPE = "sort.type";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GlobalConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GlobalConstants.java
deleted file mode 100644
index 40be383bd..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GlobalConstants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.enums;
-
-/**
- * Global constant for system
- */
-public class GlobalConstants {
-
- public static final Integer UN_DELETED = 0;
-
- public static final Integer IS_DELETED = 1;
-
- public static final Integer DELETED_STATUS = 10;
-
- public static final Integer DISABLE_CREATE_RESOURCE = 0;
-
- public static final Integer ENABLE_CREATE_RESOURCE = 1;
-
- public static final Integer SYNC_SEND = 1;
-
- public static final Integer UN_SYNC_SEND = 0;
-
-}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index ccdb162ef..8c3a0370c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -70,7 +70,7 @@ public class DeleteSortListener implements SortOperateListener {
Map<String, String> kvConf = extList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
groupId);
@@ -82,7 +82,7 @@ public class DeleteSortListener implements SortOperateListener {
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s]", groupId);
return ListenerResult.fail(message);
@@ -90,7 +90,7 @@ public class DeleteSortListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 4b2b2ff65..926146a79 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -72,7 +72,7 @@ public class DeleteStreamListener implements SortOperateListener {
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format(
"delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
@@ -85,7 +85,7 @@ public class DeleteStreamListener implements SortOperateListener {
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
return ListenerResult.fail(message);
@@ -93,7 +93,7 @@ public class DeleteStreamListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 507972587..cd552d9d8 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -21,11 +21,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -71,7 +71,7 @@ public class RestartSortListener implements SortOperateListener {
Map<String, String> kvConf = extList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
groupId);
@@ -83,12 +83,12 @@ public class RestartSortListener implements SortOperateListener {
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s]", groupId);
return ListenerResult.fail(message);
}
- String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+ String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
String message = String.format("dataflow is empty for groupId [%s]", groupId);
log.error(message);
@@ -99,7 +99,7 @@ public class RestartSortListener implements SortOperateListener {
flinkInfo.setJobId(jobId);
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 3974da351..08a554119 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -21,13 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -73,7 +73,7 @@ public class RestartStreamListener implements SortOperateListener {
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format(
"restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
@@ -86,12 +86,12 @@ public class RestartStreamListener implements SortOperateListener {
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
return ListenerResult.fail(message);
}
- String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+ String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
String message = String.format("dataflow is empty for groupId [%s] streamId [%s]", groupId, streamId);
log.error(message);
@@ -102,7 +102,7 @@ public class RestartStreamListener implements SortOperateListener {
flinkInfo.setJobId(jobId);
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 3d99974ab..76d81587d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -21,11 +21,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -73,7 +73,7 @@ public class StartupSortListener implements SortOperateListener {
&& StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
InlongGroupExtInfo::getKeyName,
InlongGroupExtInfo::getKeyValue));
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isNotEmpty(sortExt)) {
Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
new TypeReference<Map<String, String>>() {
@@ -81,7 +81,7 @@ public class StartupSortListener implements SortOperateListener {
kvConf.putAll(result);
}
- String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+ String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
String message = String.format("dataflow is empty for groupId [%s]", groupId);
log.error(message);
@@ -91,7 +91,7 @@ public class StartupSortListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
@@ -113,7 +113,7 @@ public class StartupSortListener implements SortOperateListener {
return ListenerResult.fail(message + e.getMessage());
}
- saveInfo(groupId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), extList);
+ saveInfo(groupId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), extList);
flinkOperation.pollJobStatus(flinkInfo);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index b64633830..e37f91355 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -21,13 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -74,7 +74,7 @@ public class StartupStreamListener implements SortOperateListener {
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isNotEmpty(sortExt)) {
Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
new TypeReference<Map<String, String>>() {
@@ -82,7 +82,7 @@ public class StartupStreamListener implements SortOperateListener {
kvConf.putAll(result);
}
- String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
+ String dataFlows = kvConf.get(InlongConstants.DATA_FLOW);
if (StringUtils.isEmpty(dataFlows)) {
String message = String.format("dataflow is empty for groupId [%s] and streamId [%s]", groupId, streamId);
log.error(message);
@@ -92,7 +92,7 @@ public class StartupStreamListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
flinkInfo.setJobName(jobName);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
@@ -113,7 +113,7 @@ public class StartupStreamListener implements SortOperateListener {
return ListenerResult.fail(message + e.getMessage());
}
- saveInfo(groupId, streamId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), streamExtList);
+ saveInfo(groupId, streamId, InlongConstants.SORT_JOB_ID, flinkInfo.getJobId(), streamExtList);
flinkOperation.pollJobStatus(flinkInfo);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 16ebfd291..bba9c1e19 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -25,7 +25,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -70,7 +70,7 @@ public class SuspendSortListener implements SortOperateListener {
Map<String, String> kvConf = extList.stream().collect(
Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
groupId);
@@ -82,7 +82,7 @@ public class SuspendSortListener implements SortOperateListener {
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s]", groupId);
return ListenerResult.fail(message);
@@ -90,7 +90,7 @@ public class SuspendSortListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index e5cb9cb7b..ee54b9571 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -21,13 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -72,7 +72,7 @@ public class SuspendStreamListener implements SortOperateListener {
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
- String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
+ String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format(
"suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty",
@@ -85,7 +85,7 @@ public class SuspendStreamListener implements SortOperateListener {
new TypeReference<Map<String, String>>() {
});
kvConf.putAll(result);
- String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
+ String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
return ListenerResult.fail(message);
@@ -93,7 +93,7 @@ public class SuspendStreamListener implements SortOperateListener {
FlinkInfo flinkInfo = new FlinkInfo();
flinkInfo.setJobId(jobId);
- String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
+ String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
index e91dacde5..58527432b 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.junit.jupiter.api.Test;
@@ -46,13 +46,13 @@ public class DeleteSortListenerTest {
groupResourceProcessForm.setGroupInfo(pulsarInfo);
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
- inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+ inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
inlongGroupExtInfos.add(inlongGroupExtInfo1);
InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
- inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> sortProperties = new HashMap<>(16);
String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -60,7 +60,7 @@ public class DeleteSortListenerTest {
inlongGroupExtInfos.add(inlongGroupExtInfo2);
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
- inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
+ inlongGroupExtInfo5.setKeyName(InlongConstants.SORT_JOB_ID);
inlongGroupExtInfo5.setKeyValue("d7e613fb18876f173ec5ba17465fae64");
inlongGroupExtInfos.add(inlongGroupExtInfo5);
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
index 906ecbad7..9134eb704 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
@@ -18,10 +18,10 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.junit.jupiter.api.Test;
@@ -45,13 +45,13 @@ public class RestartSortListenerTest {
groupResourceProcessForm.setGroupInfo(pulsarInfo);
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
- inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+ inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
inlongGroupExtInfo1.setKeyValue("127.0.0.1:8081");
List<InlongGroupExtInfo> inlongGroupExtInfoList = new ArrayList<>();
inlongGroupExtInfoList.add(inlongGroupExtInfo1);
InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
- inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> sortProperties = new HashMap<>(16);
String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -59,12 +59,12 @@ public class RestartSortListenerTest {
inlongGroupExtInfoList.add(inlongGroupExtInfo2);
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
- inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
+ inlongGroupExtInfo5.setKeyName(InlongConstants.SORT_JOB_ID);
inlongGroupExtInfo5.setKeyValue("efdc85a977e72e0d9a99170d78f03ddb");
inlongGroupExtInfoList.add(inlongGroupExtInfo5);
InlongGroupExtInfo inlongGroupExtInfo6 = new InlongGroupExtInfo();
- inlongGroupExtInfo6.setKeyName(InlongGroupSettings.DATA_FLOW);
+ inlongGroupExtInfo6.setKeyName(InlongConstants.DATA_FLOW);
inlongGroupExtInfo6.setKeyValue("{\"streamId\":{\n"
+ " \"id\":1,\n"
+ " \"source_info\":{\n"
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
index 5f6f97d03..cafec2a56 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
@@ -18,10 +18,10 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.junit.jupiter.api.Test;
@@ -48,12 +48,12 @@ public class StartupSortListenerTest {
List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
- inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+ inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
inlongGroupExtInfos.add(inlongGroupExtInfo1);
InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
- inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> sortProperties = new HashMap<>(16);
String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -61,7 +61,7 @@ public class StartupSortListenerTest {
inlongGroupExtInfos.add(inlongGroupExtInfo2);
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
- inlongGroupExtInfo5.setKeyName(InlongGroupSettings.DATA_FLOW);
+ inlongGroupExtInfo5.setKeyName(InlongConstants.DATA_FLOW);
inlongGroupExtInfo5.setKeyValue("{\"streamId\":{\n"
+ " \"id\": 1,\n"
+ " \"source_info\":\n"
diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
index 4ea39ba8f..f9f09a2c1 100644
--- a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
+++ b/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
@@ -18,10 +18,10 @@
package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.junit.jupiter.api.Test;
@@ -45,13 +45,13 @@ public class SuspendSortListenerTest {
groupResourceProcessForm.setGroupInfo(pulsarInfo);
InlongGroupExtInfo inlongGroupExtInfo1 = new InlongGroupExtInfo();
- inlongGroupExtInfo1.setKeyName(InlongGroupSettings.SORT_URL);
+ inlongGroupExtInfo1.setKeyName(InlongConstants.SORT_URL);
inlongGroupExtInfo1.setKeyValue("127.0.0.1:8085");
List<InlongGroupExtInfo> inlongGroupExtInfos = new ArrayList<>();
inlongGroupExtInfos.add(inlongGroupExtInfo1);
InlongGroupExtInfo inlongGroupExtInfo2 = new InlongGroupExtInfo();
- inlongGroupExtInfo2.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ inlongGroupExtInfo2.setKeyName(InlongConstants.SORT_PROPERTIES);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> sortProperties = new HashMap<>(16);
String sortStr = objectMapper.writeValueAsString(sortProperties);
@@ -59,7 +59,7 @@ public class SuspendSortListenerTest {
inlongGroupExtInfos.add(inlongGroupExtInfo2);
InlongGroupExtInfo inlongGroupExtInfo5 = new InlongGroupExtInfo();
- inlongGroupExtInfo5.setKeyName(InlongGroupSettings.SORT_JOB_ID);
+ inlongGroupExtInfo5.setKeyName(InlongConstants.SORT_JOB_ID);
inlongGroupExtInfo5.setKeyValue("ea405ab424cfc35ae9be93df8ea87917");
inlongGroupExtInfos.add(inlongGroupExtInfo5);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
index ef799913b..807988ff3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.cluster;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -45,7 +45,7 @@ public abstract class AbstractClusterOperator implements InlongClusterOperator {
entity.setCreator(operator);
entity.setCreateTime(new Date());
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
clusterMapper.insert(entity);
return entity.getId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 2f051ee0c..47ba191c0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -28,9 +28,9 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -44,7 +44,6 @@ import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
@@ -194,7 +193,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
public Boolean delete(Integer id, String operator) {
Preconditions.checkNotNull(id, "cluster id cannot be empty");
InlongClusterEntity entity = clusterMapper.selectById(id);
- if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
+ if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
LOGGER.error("inlong cluster not found by id={}, or was already deleted", id);
return false;
}
@@ -223,7 +222,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
InlongClusterNodeEntity entity = CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
entity.setCreator(operator);
entity.setCreateTime(new Date());
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
clusterNodeMapper.insert(entity);
LOGGER.info("success to add inlong cluster node={}", request);
@@ -308,7 +307,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
public Boolean deleteNode(Integer id, String operator) {
Preconditions.checkNotNull(id, "cluster node id cannot be empty");
InlongClusterNodeEntity entity = clusterNodeMapper.selectById(id);
- if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
+ if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
LOGGER.error("inlong cluster node not found by id={}", id);
return false;
}
@@ -404,11 +403,11 @@ public class InlongClusterServiceImpl implements InlongClusterService {
PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
String tenant = pulsarCluster.getTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
String streamId = streamInfo.getInlongStreamId();
- String topic = String.format(InlongGroupSettings.PULSAR_TOPIC_FORMAT,
+ String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
tenant, mqResource, streamInfo.getMqResource());
DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
topicConfig.setInlongGroupId(groupId + "/" + streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index f1b0c5d1b..e8a7f578b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -22,10 +22,10 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
@@ -40,7 +40,6 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.LoginUserUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -346,7 +345,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
entity.setFilterEnabled(0);
entity.setStatus(ConsumptionStatus.APPROVED.getStatus());
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
entity.setCreator(groupInfo.getCreator());
entity.setCreateTime(new Date());
@@ -357,7 +356,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
pulsarEntity.setConsumptionId(entity.getId());
pulsarEntity.setConsumerGroup(consumerGroup);
pulsarEntity.setInlongGroupId(groupId);
- pulsarEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ pulsarEntity.setIsDeleted(InlongConstants.UN_DELETED);
consumptionPulsarMapper.insert(pulsarEntity);
}
@@ -413,8 +412,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
inlongGroupEntity.getInlongClusterTag(), null, ClusterType.CLS_PULSAR);
String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
- ? InlongGroupSettings.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
- info.setTopic(String.format(InlongGroupSettings.PULSAR_TOPIC_FORMAT, tenant,
+ ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
+ info.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
inlongGroupEntity.getMqResource(), info.getTopic()));
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
index 2a72625ec..fcb834fd4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
@@ -21,7 +21,7 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.common.enums.DataNodeType;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.common.pojo.node.DataNodeRequest;
@@ -72,7 +72,7 @@ public class DataNodeServiceImpl implements DataNodeService {
DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
entity.setCreator(operator);
entity.setCreateTime(new Date());
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
dataNodeMapper.insert(entity);
LOGGER.debug("success to save data node={}", request);
@@ -138,7 +138,7 @@ public class DataNodeServiceImpl implements DataNodeService {
public Boolean delete(Integer id, String operator) {
Preconditions.checkNotNull(id, "data node id cannot be empty");
DataNodeEntity entity = dataNodeMapper.selectById(id);
- if (entity == null || entity.getIsDeleted() > GlobalConstants.UN_DELETED) {
+ if (entity == null || entity.getIsDeleted() > InlongConstants.UN_DELETED) {
LOGGER.error("data node not found or was already deleted for id={}", id);
return false;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 121c25e50..e220b3285 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -23,7 +23,7 @@ import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -486,7 +486,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
streamEntity.setMaxLength(1000);
streamEntity.setStatus(StreamStatus.CONFIG_SUCCESSFUL.getCode());
- streamEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ streamEntity.setIsDeleted(InlongConstants.UN_DELETED);
streamEntity.setCreator(operator);
streamEntity.setModifier(operator);
Date now = new Date();
@@ -531,7 +531,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
for (InlongStreamFieldEntity entity : list) {
entity.setInlongGroupId(groupId);
entity.setInlongStreamId(streamId);
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
}
streamFieldMapper.insertAll(list);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
index 20cb9ebb3..961459fb5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.core.impl;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.pojo.workflow.FilterKey;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApprover;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
@@ -119,7 +119,7 @@ public class WorkflowApproverServiceImpl implements WorkflowApproverService {
Preconditions.checkEmpty(exist, "already exit the same config");
WorkflowApproverEntity entity = CommonBeanUtils.copyProperties(approver, WorkflowApproverEntity::new);
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
int success = this.workflowApproverMapper.insert(entity);
Preconditions.checkTrue(success == 1, "add failed");
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index e4aed9328..ba736d65f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.group;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
@@ -54,7 +54,7 @@ public abstract class AbstractGroupOperator implements InlongGroupOperator {
// after saving, the status is set to [GROUP_WAIT_SUBMIT]
entity.setStatus(GroupStatus.TO_BE_SUBMIT.getCode());
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
entity.setCreator(operator);
entity.setCreateTime(new Date());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
index 8b27040d2..4d9b700a7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
@@ -110,7 +110,7 @@ public class InlongPulsarOperator extends AbstractGroupOperator {
// TODO add cache for cluster info
// pulsar topic corresponds to the inlong stream one-to-one
// topicInfo.setDsTopicList(streamService.getTopicList(groupInfo.getInlongGroupId()));
- // commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL);
+ // commonOperateService.getSpecifiedParam(InlongConstants.TUBE_MASTER_URL);
// groupInfo.setTenant();
// groupInfo.setAdminUrl();
// groupInfo.setServiceUrl();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index e50e14f37..460dbf928 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -27,7 +27,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -90,7 +90,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
- tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
String namespace = groupInfo.getMqResource();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index 146669fad..bdc45b613 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -107,7 +107,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
// create pulsar tenant
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
- tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
pulsarOperator.createTenant(pulsarAdmin, tenant);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 5b7e63725..3a506aa4c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
@@ -28,7 +29,6 @@ import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
@@ -74,7 +74,7 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
- tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
index 8a6c45d18..d9f33b2a9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -46,7 +46,7 @@ public class PulsarEventSelector implements EventSelector {
MQType mqType = MQType.forType(groupInfo.getMqType());
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- boolean enable = GlobalConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ boolean enable = InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
if (enable) {
log.info("need to create pulsar resource as the createResource was true for groupId [{}]", groupId);
return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
index 029958a92..2e0450479 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -66,7 +66,7 @@ public class SinkResourceListener implements SinkOperateListener {
}
List<SinkInfo> configList = sinkMapper.selectAllConfig(groupId, streamIdList);
List<SinkInfo> needCreateList = configList.stream()
- .filter(sinkInfo -> GlobalConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
+ .filter(sinkInfo -> InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needCreateList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
index 0a28bd9c4..cde624ed7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.resource;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -63,7 +63,7 @@ public class StreamSinkResourceListener implements SinkOperateListener {
List<SinkInfo> sinkInfos = sinkEntityMapper.selectAllConfig(groupId, Lists.newArrayList(streamId));
List<SinkInfo> needCreateResources = sinkInfos.stream()
- .filter(sinkInfo -> GlobalConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
+ .filter(sinkInfo -> InlongConstants.ENABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needCreateResources)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
index 91fd00b36..f055d1344 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/ck/ClickHouseResourceOperator.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.resource.ck;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -67,7 +67,7 @@ public class ClickHouseResourceOperator implements SinkResourceOperator {
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
- } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
index f5cc4bf0f..fb6c4883c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/es/ElasticsearchResourceOperator.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.resource.es;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -68,7 +68,7 @@ public class ElasticsearchResourceOperator implements SinkResourceOperator {
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
- } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java
index f6c66dc1b..4346fff41 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hbase/HBaseResourceOperator.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.resource.hbase;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -73,7 +73,7 @@ public class HBaseResourceOperator implements SinkResourceOperator {
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
- } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
index 1fb67bf45..40b4349ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/hive/HiveResourceOperator.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.resource.hive;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -70,7 +70,7 @@ public class HiveResourceOperator implements SinkResourceOperator {
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
- } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
index bb1857eb8..2beafec60 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/iceberg/IcebergResourceOperator.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.resource.iceberg;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -70,7 +70,7 @@ public class IcebergResourceOperator implements SinkResourceOperator {
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
- } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
index d4ab18bc0..74e00f3ba 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.resource.postgres;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
@@ -69,7 +69,7 @@ public class PostgresResourceOperator implements SinkResourceOperator {
if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
LOGGER.warn("postgres resource [" + sinkInfo.getId() + "] already success, skip to create");
return;
- } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
return;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 3cc13ff45..f422e553b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -81,7 +81,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
public Integer saveOpt(SinkRequest request, String operator) {
StreamSinkEntity entity = CommonBeanUtils.copyProperties(request, StreamSinkEntity::new);
entity.setStatus(SinkStatus.NEW.getCode());
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
entity.setCreator(operator);
entity.setModifier(operator);
Date now = new Date();
@@ -163,7 +163,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
fieldEntity.setInlongStreamId(streamId);
fieldEntity.setSinkType(sinkType);
fieldEntity.setSinkId(sinkId);
- fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
entityList.add(fieldEntity);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 13091daf3..4d396b614 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -252,7 +252,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
entity.setPreviousStatus(entity.getStatus());
- entity.setStatus(GlobalConstants.DELETED_STATUS);
+ entity.setStatus(InlongConstants.DELETED_STATUS);
entity.setIsDeleted(id);
entity.setModifier(operator);
entity.setModifyTime(new Date());
@@ -279,7 +279,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
entityList.forEach(entity -> {
Integer id = entity.getId();
entity.setPreviousStatus(entity.getStatus());
- entity.setStatus(GlobalConstants.DELETED_STATUS);
+ entity.setStatus(InlongConstants.DELETED_STATUS);
entity.setIsDeleted(id);
entity.setModifier(operator);
entity.setModifyTime(now);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index 1f5c204d4..f7d397e9f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -24,7 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -149,7 +149,7 @@ public class IcebergSinkOperator extends AbstractSinkOperator {
fieldEntity.setInlongStreamId(streamId);
fieldEntity.setSinkType(sinkType);
fieldEntity.setSinkId(sinkId);
- fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
entityList.add(fieldEntity);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
index 19cf294d8..0369f6a2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListener.java
@@ -21,13 +21,13 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.DataFlowUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -93,7 +93,7 @@ public class CreateSortConfigListener implements SortOperateListener {
// String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
extInfo.setInlongGroupId(groupId);
- extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+ extInfo.setKeyName(InlongConstants.DATA_FLOW);
// extInfo.setKeyValue(dataFlows);
if (groupInfo.getExtList() == null) {
groupInfo.setExtList(Lists.newArrayList());
@@ -107,7 +107,7 @@ public class CreateSortConfigListener implements SortOperateListener {
}
private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
- groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+ groupInfo.getExtList().removeIf(ext -> InlongConstants.DATA_FLOW.equals(ext.getKeyName()));
groupInfo.getExtList().add(extInfo);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 5276fd5b8..1aa87a37d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
@@ -38,7 +39,6 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
@@ -96,7 +96,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
extInfo.setInlongGroupId(groupId);
- extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+ extInfo.setKeyName(InlongConstants.DATA_FLOW);
extInfo.setKeyValue(dataFlows);
if (groupInfo.getExtList() == null) {
groupInfo.setExtList(Lists.newArrayList());
@@ -144,7 +144,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
String adminUrl = pulsarCluster.getAdminUrl();
String serviceUrl = pulsarCluster.getUrl();
- String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongGroupSettings.DEFAULT_PULSAR_TENANT
+ String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) ? InlongConstants.DEFAULT_PULSAR_TENANT
: pulsarCluster.getTenant();
streamInfoList.forEach(streamInfo -> {
PulsarSource pulsarSource = new PulsarSource();
@@ -194,7 +194,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
}
private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
- groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+ groupInfo.getExtList().removeIf(ext -> InlongConstants.DATA_FLOW.equals(ext.getKeyName()));
groupInfo.getExtList().add(extInfo);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 22723787c..8c144b023 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -37,7 +37,7 @@ import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
@@ -105,7 +105,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
extInfo.setInlongGroupId(groupId);
extInfo.setInlongStreamId(streamId);
- String keyName = InlongGroupSettings.DATA_FLOW;
+ String keyName = InlongConstants.DATA_FLOW;
extInfo.setKeyName(keyName);
extInfo.setKeyValue(dataFlows);
if (streamInfo.getExtList() == null) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
index 6308b70cd..2311e2834 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sort.light;
import com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -27,7 +28,6 @@ import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
@@ -86,7 +86,7 @@ public class LightGroupSortListener implements SortOperateListener {
String dataFlows = OBJECT_MAPPER.writeValueAsString(configInfo);
InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
extInfo.setInlongGroupId(groupId);
- extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+ extInfo.setKeyName(InlongConstants.DATA_FLOW);
extInfo.setKeyValue(dataFlows);
if (groupInfo.getExtList() == null) {
groupInfo.setExtList(Lists.newArrayList());
@@ -145,7 +145,7 @@ public class LightGroupSortListener implements SortOperateListener {
}
private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
- groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
+ groupInfo.getExtList().removeIf(ext -> InlongConstants.DATA_FLOW.equals(ext.getKeyName()));
groupInfo.getExtList().add(extInfo);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
index 51767692b..6764219b9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
@@ -69,7 +69,7 @@ public class DataFlowUtils {
FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
// Get source info
- String masterAddress = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL);
+ String masterAddress = commonOperateService.getSpecifiedParam(InlongConstants.TUBE_MASTER_URL);
PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
org.apache.inlong.sort.protocol.source.SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster,
masterAddress, clusterBean,
@@ -86,7 +86,7 @@ public class DataFlowUtils {
if (MapUtils.isNotEmpty(streamSink.getProperties())) {
properties.putAll(streamSink.getProperties());
}
- properties.put(InlongGroupSettings.DATA_FLOW_GROUP_ID_KEY, groupId);
+ properties.put(InlongConstants.DATA_FLOW_GROUP_ID_KEY, groupId);
return new DataFlowInfo(streamSink.getId(), sourceInfo, transInfo, sinkInfo, properties);
}*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 94a72b35d..8c1281d29 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.source;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -103,7 +103,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
Date now = new Date();
entity.setCreateTime(now);
entity.setModifyTime(now);
- entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setIsDeleted(InlongConstants.UN_DELETED);
// get the ext params
setTargetEntity(request, entity);
sourceMapper.insert(entity);
@@ -236,7 +236,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
fieldEntity.setInlongStreamId(streamId);
fieldEntity.setSourceId(sourceId);
fieldEntity.setSourceType(sourceType);
- fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
entityList.add(fieldEntity);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 9865d9660..0badb762f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,7 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
@@ -86,7 +86,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
Date now = new Date();
transformEntity.setCreateTime(now);
transformEntity.setModifyTime(now);
- transformEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ transformEntity.setIsDeleted(InlongConstants.UN_DELETED);
transformMapper.insert(transformEntity);
saveFieldOpt(transformEntity, transformRequest.getFieldList());
return transformEntity.getId();
@@ -221,7 +221,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
fieldEntity.setRankNum(fieldInfo.getId());
fieldEntity.setTransformId(transformId);
fieldEntity.setTransformType(transformType);
- fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
entityList.add(fieldEntity);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 2baff6770..544e1b03b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.workflow.consumption.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.MQType;
@@ -28,7 +29,6 @@ import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -127,7 +127,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
PulsarTopicBean topicMessage = new PulsarTopicBean();
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
- tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
topicMessage.setTenant(tenant);
topicMessage.setNamespace(mqResource);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 05c794da6..0b5216654 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.pojo.cluster.dataproxy.DataProxyClusterR
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyNodeInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -210,7 +209,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
Assertions.assertNotNull(nodeId1);
Integer port2 = 46801;
- Integer nodeId2 = this.saveClusterNode(id, InlongGroupSettings.CLUSTER_DATA_PROXY, ip, port2);
+ Integer nodeId2 = this.saveClusterNode(id, ClusterType.CLS_DATA_PROXY, ip, port2);
Assertions.assertNotNull(nodeId2);
// Get the data proxy cluster ip list, the first port should is p1, second port is p2
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
index 281d20b22..b51002acc 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -64,7 +64,7 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setUsername(ckUsername);
sinkInfo.setDbName(ckDatabaseName);
sinkInfo.setTableName(ckTableName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
sinkInfo.setId((int) (Math.random() * 100000 + 1));
return sinkService.save(sinkInfo, globalOperator);
}
@@ -92,7 +92,7 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
ClickHouseSink ckSink = (ClickHouseSink) response;
- ckSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ ckSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SinkRequest request = ckSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
index 55df215e1..e16f34a67 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/DLCIcebergStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -61,7 +61,7 @@ public class DLCIcebergStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkType(SinkType.SINK_DLCICEBERG);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkField sinkField = new SinkField();
sinkField.setFieldName(fieldName);
sinkField.setFieldType(fieldType);
@@ -98,7 +98,7 @@ public class DLCIcebergStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
DLCIcebergSink dlcIcebergSink = (DLCIcebergSink) response;
- dlcIcebergSink.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ dlcIcebergSink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkRequest request = dlcIcebergSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java
index f5bc530ea..3072d4a36 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ElasticsearchStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -63,7 +63,7 @@ public class ElasticsearchStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setVersion(7);
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
return sinkService.save(sinkInfo, globalOperator);
}
@@ -90,7 +90,7 @@ public class ElasticsearchStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
ElasticsearchSink elasticsearchSink = (ElasticsearchSink) response;
- elasticsearchSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ elasticsearchSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SinkRequest request = elasticsearchSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
index 69f1abf3b..677880367 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/GreenplumStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -68,7 +68,7 @@ public class GreenplumStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setPrimaryKey("name,age");
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkField sinkField = new SinkField();
sinkField.setFieldName(fieldName);
sinkField.setFieldType(fieldType);
@@ -102,7 +102,7 @@ public class GreenplumStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
GreenplumSink greenplumSink = (GreenplumSink) response;
- greenplumSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ greenplumSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SinkRequest request = greenplumSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java
index cff4a028a..cee710ff5 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HBaseStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -55,7 +55,7 @@ public class HBaseStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkType(SinkType.SINK_HBASE);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
sinkInfo.setSinkName(sinkName);
sinkInfo.setTableName(tableName);
sinkInfo.setNamespace(nameSpace);
@@ -87,7 +87,7 @@ public class HBaseStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
HBaseSink hbaseSink = (HBaseSink) response;
- hbaseSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ hbaseSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SinkRequest request = hbaseSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
index 86a44580a..85cb8f8bb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HdfsStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -61,7 +61,7 @@ public class HdfsStreamSinkServiceTest extends ServiceBaseTest {
hdfsSinkRequest.setInlongGroupId(globalGroupId);
hdfsSinkRequest.setInlongStreamId(globalStreamId);
hdfsSinkRequest.setSinkType(SinkType.SINK_HDFS);
- hdfsSinkRequest.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ hdfsSinkRequest.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
hdfsSinkRequest.setSinkName(sinkName);
hdfsSinkRequest.setFileFormat(fileFormat);
hdfsSinkRequest.setDataPath(dataPath);
@@ -101,7 +101,7 @@ public class HdfsStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
HdfsSink hdfsSink = (HdfsSink) response;
- hdfsSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ hdfsSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
HdfsSinkRequest request = CommonBeanUtils.copyProperties(hdfsSink, HdfsSinkRequest::new);
boolean result = sinkService.update(request, globalOperator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
index 07e4686e3..101750e30 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
@@ -55,7 +55,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkType(SinkType.SINK_HIVE);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
sinkInfo.setSinkName(sinkName);
return sinkService.save(sinkInfo, globalOperator);
}
@@ -86,7 +86,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
HiveSink hiveResponse = (HiveSink) response;
- hiveResponse.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ hiveResponse.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
HiveSinkRequest request = CommonBeanUtils.copyProperties(hiveResponse, HiveSinkRequest::new);
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
index c6fa760aa..b16a14010 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -54,7 +54,7 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setInlongGroupId(globalGroupId);
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkType(SinkType.SINK_ICEBERG);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
sinkInfo.setSinkName(sinkName);
sinkInfo.setId((int) (Math.random() * 100000 + 1));
@@ -84,7 +84,7 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
IcebergSink icebergSink = (IcebergSink) response;
- icebergSink.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ icebergSink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkRequest request = icebergSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
index 4dda378d3..031a52745 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -58,7 +58,7 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setSerializationType(serializationType);
sinkInfo.setBootstrapServers(bootstrapServers);
sinkInfo.setTopicName(topicName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
return sinkService.save(sinkInfo, GLOBAL_OPERATOR);
}
@@ -85,7 +85,7 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
KafkaSink kafkaSink = (KafkaSink) response;
- kafkaSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ kafkaSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SinkRequest request = kafkaSink.genSinkRequest();
boolean result = sinkService.update(request, GLOBAL_OPERATOR);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java
index 9270be11c..08a497819 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/MysqlStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -68,7 +68,7 @@ public class MysqlStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setPrimaryKey("name,age");
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkField sinkField = new SinkField();
sinkField.setFieldName(fieldName);
sinkField.setFieldType(fieldType);
@@ -102,7 +102,7 @@ public class MysqlStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
MySQLSink mysqlSink = (MySQLSink) response;
- mysqlSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ mysqlSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
MySQLSinkRequest request = CommonBeanUtils.copyProperties(mysqlSink,
MySQLSinkRequest::new);
boolean result = sinkService.update(request, globalOperator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java
index ed2075ee3..3b806544e 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -68,7 +68,7 @@ public class OracleStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setPrimaryKey("name,age");
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkField sinkField = new SinkField();
sinkField.setFieldName(fieldName);
sinkField.setFieldType(fieldType);
@@ -102,7 +102,7 @@ public class OracleStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
OracleSink oracleSink = (OracleSink) response;
- oracleSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ oracleSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
OracleSinkRequest request = CommonBeanUtils.copyProperties(oracleSink,
OracleSinkRequest::new);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
index fed6f8c00..910656d78 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.core.sink;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -72,7 +72,7 @@ public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setPrimaryKey("name,age");
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
return sinkService.save(sinkInfo, globalOperator);
}
@@ -99,7 +99,7 @@ public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
PostgresSink postgresSink = (PostgresSink) response;
- postgresSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ postgresSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SinkRequest request = postgresSink.genSinkRequest();
boolean result = sinkService.update(request, globalOperator);
Assertions.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
index 63ee46745..8775e25fe 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/SqlServerStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.sink.sqlserver.SqlServerSink;
@@ -63,7 +63,7 @@ public class SqlServerStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setPrimaryKey("name,age");
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
return sinkService.save(sinkInfo, globalOperator);
}
@@ -90,7 +90,7 @@ public class SqlServerStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
SqlServerSink sqlServerSink = (SqlServerSink) response;
- sqlServerSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ sqlServerSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
SqlServerSinkRequest request = CommonBeanUtils.copyProperties(sqlServerSink,
SqlServerSinkRequest::new);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java
index 002c7d3cc..e76699a2d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/TDSQLPostgreSQLStreamSinkServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.sink;
-import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -69,7 +69,7 @@ class TDSQLPostgreSQLStreamSinkServiceTest extends ServiceBaseTest {
sinkInfo.setPrimaryKey("name,age");
sinkInfo.setSinkName(sinkName);
- sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
SinkField sinkField = new SinkField();
sinkField.setFieldName(fieldName);
sinkField.setFieldType(fieldType);
@@ -103,7 +103,7 @@ class TDSQLPostgreSQLStreamSinkServiceTest extends ServiceBaseTest {
Assertions.assertEquals(globalGroupId, response.getInlongGroupId());
TDSQLPostgreSQLSink tdsqlPostgreSQLSink = (TDSQLPostgreSQLSink) response;
- tdsqlPostgreSQLSink.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+ tdsqlPostgreSQLSink.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
TDSQLPostgreSQLSinkRequest request = CommonBeanUtils.copyProperties(tdsqlPostgreSQLSink,
TDSQLPostgreSQLSinkRequest::new);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java
index 65d9651aa..a99f4a1e4 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/mq/util/PulsarUtilsTest.java
@@ -17,76 +17,33 @@
package org.apache.inlong.manager.service.mq.util;
-import com.google.common.collect.Lists;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
-import java.util.ArrayList;
/**
- * Test class for pulsar utils.
+ * Test class for Pulsar utils.
*/
public class PulsarUtilsTest {
- // There will be concurrency problems in the overall operation,This method temporarily fails the test
- // @Test
+ @Test
public void testGetPulsarAdmin() {
- InlongGroupExtInfo groupExtInfo1 = new InlongGroupExtInfo();
- groupExtInfo1.setId(1);
- groupExtInfo1.setInlongGroupId("group1");
- groupExtInfo1.setKeyName(InlongGroupSettings.PULSAR_ADMIN_URL);
- groupExtInfo1.setKeyValue("http://127.0.0.1:8080");
-
- InlongGroupExtInfo groupExtInfo2 = new InlongGroupExtInfo();
- groupExtInfo2.setId(2);
- groupExtInfo2.setInlongGroupId("group1");
- groupExtInfo2.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION);
- groupExtInfo2.setKeyValue("QWEASDZXC");
- ArrayList<InlongGroupExtInfo> groupExtInfoList = Lists.newArrayList(groupExtInfo1, groupExtInfo2);
- InlongPulsarInfo groupInfo = new InlongPulsarInfo();
- groupInfo.setExtList(groupExtInfoList);
-
final String defaultServiceUrl = "http://127.0.0.1:10080";
try {
PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
- Assertions.assertEquals("http://127.0.0.1:8080", admin.getServiceUrl());
+ Assertions.assertEquals(defaultServiceUrl, admin.getServiceUrl());
Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
assert auth != null;
auth.setAccessible(true);
Authentication authentication = (Authentication) auth.get(admin);
Assertions.assertNotNull(authentication);
-
- InlongGroupExtInfo groupExtInfo3 = new InlongGroupExtInfo();
- groupExtInfo3.setId(3);
- groupExtInfo3.setInlongGroupId("group1");
- groupExtInfo3.setKeyName(InlongGroupSettings.PULSAR_AUTHENTICATION_TYPE);
- groupExtInfo3.setKeyValue("token1");
- groupExtInfoList.add(groupExtInfo3);
- try {
- admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
- } catch (Exception e) {
- if (e instanceof IllegalArgumentException) {
- Assertions.assertTrue(e.getMessage().contains("illegal authentication type"));
- }
- }
-
- groupExtInfoList = new ArrayList<>();
- groupInfo.setExtList(groupExtInfoList);
- admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
- Assertions.assertEquals("http://127.0.0.1:10080", admin.getServiceUrl());
- auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
- assert auth != null;
- auth.setAccessible(true);
- authentication = (Authentication) auth.get(admin);
Assertions.assertTrue(authentication instanceof AuthenticationDisabled);
} catch (PulsarClientException | IllegalAccessException e) {
Assertions.fail();