You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/04 02:14:44 UTC
[inlong] branch master updated: [INLONG-6393][Common] Remove the usage of lombok (#6394)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8b1c8cf78 [INLONG-6393][Common] Remove the usage of lombok (#6394)
8b1c8cf78 is described below
commit 8b1c8cf782405d38afffb37643b616fe273c354d
Author: healchow <he...@gmail.com>
AuthorDate: Fri Nov 4 10:14:37 2022 +0800
[INLONG-6393][Common] Remove the usage of lombok (#6394)
---
.../apache/inlong/agent/core/HeartbeatManager.java | 2 +-
.../inlong/common/enums/ComponentTypeEnum.java | 29 ++++++------
.../inlong/common/enums/DataProxyErrCode.java | 39 +++++++---------
.../apache/inlong/common/enums/DataTypeEnum.java | 26 +++++++----
.../apache/inlong/common/enums/ManagerOpEnum.java | 11 ++---
.../inlong/common/enums/PullJobTypeEnum.java | 8 ++--
.../apache/inlong/common/enums/RowKindEnum.java | 52 +++++++++++-----------
.../apache/inlong/common/enums/TaskTypeEnum.java | 5 ++-
.../dataproxy/heartbeat/HeartbeatManager.java | 2 +-
.../dataproxy/sink/pulsar/PulsarClientService.java | 2 +-
.../dataproxy/source/ServerMessageHandler.java | 2 +-
.../client/api/inner/HeartbeatClientTest.java | 24 +++++-----
.../manager/pojo/sort/util/ExtractNodeUtils.java | 4 +-
.../manager/pojo/sort/util/LoadNodeUtils.java | 4 +-
.../service/heartbeat/HeartbeatServiceImpl.java | 14 +++---
.../source/pulsar/PulsarSourceOperator.java | 4 +-
.../service/cluster/InlongClusterServiceTest.java | 4 +-
.../core/heartbeat/HeartbeatManagerTest.java | 2 +-
.../service/core/impl/HeartbeatServiceTest.java | 4 +-
19 files changed, 125 insertions(+), 113 deletions(-)
diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 9c8f54db3..ffae9eb4d 100644
--- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -187,7 +187,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(agentIp);
heartbeatMsg.setPort(String.valueOf(agentPort));
- heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getName());
+ heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
if (StringUtils.isNotBlank(clusterName)) {
heartbeatMsg.setClusterName(clusterName);
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
index d12679dbc..558f1bf02 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
@@ -17,33 +17,36 @@
package org.apache.inlong.common.enums;
-import lombok.Getter;
-
+/**
+ * Enum of inlong component type.
+ */
public enum ComponentTypeEnum {
Agent("AGENT"),
-
DataProxy("DATAPROXY"),
-
Cache("CACHE"),
-
Sort("SORT"),
+ SDK("SDK"),
- SDK("SDK");
+ ;
- @Getter
- private final String name;
+ private final String type;
- ComponentTypeEnum(String name) {
- this.name = name;
+ ComponentTypeEnum(String type) {
+ this.type = type;
}
- public static ComponentTypeEnum forName(String name) {
+ public static ComponentTypeEnum forType(String type) {
for (ComponentTypeEnum componentType : values()) {
- if (componentType.getName().equals(name)) {
+ if (componentType.getType().equals(type)) {
return componentType;
}
}
- throw new IllegalArgumentException(String.format("Unsupport componentName for Inlong:%s", name));
+ throw new IllegalArgumentException("Unsupported component type for " + type);
+ }
+
+ public String getType() {
+ return type;
}
+
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index 463e93b5e..8943ebfd3 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -18,31 +18,26 @@
package org.apache.inlong.common.enums;
+/**
+ * Enum of data proxy error code.
+ */
public enum DataProxyErrCode {
SUCCESS(0, "Ok"),
- UNSUPPORTED_MSGTYPE(1, "Unsupported msgType"),
+ UNSUPPORTED_MSG_TYPE(1, "Unsupported msgType"),
EMPTY_MSG(2, "Empty message"),
- UNSUPPORTED_EXTENDFIELD_VALUE(3,
- "Unsupported extend field value"),
- UNCONFIGURED_GROUPID_OR_STREAMID(4,
- "Unconfigured groupId or streamId"),
- PUT_EVENT_TO_CHANNEL_FAILURE(5,
- "Put event to Channels failure"),
- TOPIC_IS_BLANK(6,
- "Topic is null"),
- NO_AVAILABLE_PRODUCERINFO(7,
- "No available producer info"),
- PRODUCER_IS_NULL(8,
- "Producer is null"),
- SEND_REQUEST_TO_MQ_FAILURE(9,
- "Send request to MQ failure"),
- MQ_RETURN_ERROR(10,
- "MQ client return error"),
- DUPLICATED_MESSAGE(11,
- "Duplicated message"),
+ UNSUPPORTED_EXTEND_FIELD_VALUE(3, "Unsupported extend field value"),
+ UNCONFIGURED_GROUPID_OR_STREAMID(4, "Unconfigured groupId or streamId"),
+ PUT_EVENT_TO_CHANNEL_FAILURE(5, "Put event to Channels failure"),
+
+ TOPIC_IS_BLANK(6, "Topic is null"),
+ NO_AVAILABLE_PRODUCER(7, "No available producer info"),
+ PRODUCER_IS_NULL(8, "Producer is null"),
+ SEND_REQUEST_TO_MQ_FAILURE(9, "Send request to MQ failure"),
+ MQ_RETURN_ERROR(10, "MQ client return error"),
+ DUPLICATED_MESSAGE(11, "Duplicated message"),
UNKNOWN_ERROR(Integer.MAX_VALUE, "Unknown error");
@@ -55,9 +50,9 @@ public enum DataProxyErrCode {
}
public static DataProxyErrCode valueOf(int value) {
- for (DataProxyErrCode msgErrCode : DataProxyErrCode.values()) {
- if (msgErrCode.getErrCode() == value) {
- return msgErrCode;
+ for (DataProxyErrCode errCode : DataProxyErrCode.values()) {
+ if (errCode.getErrCode() == value) {
+ return errCode;
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index e7cbbe9f5..2d2d8d199 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -18,29 +18,37 @@
package org.apache.inlong.common.enums;
import java.util.Locale;
-import lombok.Getter;
+/**
+ * Enum of data type.
+ */
public enum DataTypeEnum {
+
CSV("csv"),
AVRO("avro"),
JSON("json"),
CANAL("canal"),
DEBEZIUM_JSON("debezium_json"),
- RAW("raw");
+ RAW("raw"),
+
+ ;
- @Getter
- private final String name;
+ private final String type;
- DataTypeEnum(String name) {
- this.name = name;
+ DataTypeEnum(String type) {
+ this.type = type;
}
- public static DataTypeEnum forName(String name) {
+ public static DataTypeEnum forType(String type) {
for (DataTypeEnum dataType : values()) {
- if (dataType.getName().equals(name.toLowerCase(Locale.ROOT))) {
+ if (dataType.getType().equals(type.toLowerCase(Locale.ROOT))) {
return dataType;
}
}
- throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name));
+ throw new IllegalArgumentException("Unsupported data type for " + type);
+ }
+
+ public String getType() {
+ return type;
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
index c63371feb..802b9211b 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ManagerOpEnum.java
@@ -17,20 +17,21 @@
package org.apache.inlong.common.enums;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Enum of operation type.
+ */
public enum ManagerOpEnum {
+
ADD(0), DEL(1), RETRY(2), BACKTRACK(3), FROZEN(4),
ACTIVE(5), CHECK(6), REDOMETRIC(7), MAKEUP(8);
- private int type;
+ private final int type;
ManagerOpEnum(int type) {
this.type = type;
}
public static ManagerOpEnum getOpType(int opType) {
- requireNonNull(opType);
switch (opType) {
case 0:
return ADD;
@@ -51,7 +52,7 @@ public enum ManagerOpEnum {
case 8:
return MAKEUP;
default:
- throw new RuntimeException("such op type doesn't exist");
+ throw new RuntimeException("Unsupported op type for " + opType);
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java
index 0b2215644..ff7aa9dd1 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/PullJobTypeEnum.java
@@ -17,20 +17,20 @@
package org.apache.inlong.common.enums;
-import static java.util.Objects.requireNonNull;
-
+/**
+ * Enum of pull job type.
+ */
public enum PullJobTypeEnum {
NEW(0), NEVER(1);
- private int type;
+ private final int type;
PullJobTypeEnum(int type) {
this.type = type;
}
public static PullJobTypeEnum getPullJobType(int type) {
- requireNonNull(type);
switch (type) {
case 0:
return NEW;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
index 296c86807..5f4dda497 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/RowKindEnum.java
@@ -22,7 +22,9 @@ package org.apache.inlong.common.enums;
*/
public enum RowKindEnum {
- /** Insertion operation. */
+ /**
+ * Insertion operation.
+ */
INSERT("+I", (byte) 0),
/**
@@ -32,11 +34,12 @@ public enum RowKindEnum {
/**
* Update operation with new content of the updated row.
- *
*/
UPDATE_AFTER("+U", (byte) 2),
- /** Deletion operation. */
+ /**
+ * Deletion operation.
+ */
DELETE("-D", (byte) 3);
private final String shortString;
@@ -52,6 +55,27 @@ public enum RowKindEnum {
this.value = value;
}
+ /**
+ * Creates a {@link RowKindEnum} from the given byte value. Each {@link RowKindEnum} has a byte value
+ * representation.
+ *
+ * @see #toByteValue() for mapping of byte value and {@link RowKindEnum}.
+ */
+ public static RowKindEnum fromByteValue(byte value) {
+ switch (value) {
+ case 0:
+ return INSERT;
+ case 1:
+ return UPDATE_BEFORE;
+ case 2:
+ return UPDATE_AFTER;
+ case 3:
+ return DELETE;
+ default:
+ throw new UnsupportedOperationException("Unsupported byte value '" + value + "' for row kind.");
+ }
+ }
+
/**
* Returns a short string representation of this {@link RowKindEnum}.
*
@@ -83,26 +107,4 @@ public enum RowKindEnum {
return value;
}
- /**
- * Creates a {@link RowKindEnum} from the given byte value. Each {@link RowKindEnum} has a byte value
- * representation.
- *
- * @see #toByteValue() for mapping of byte value and {@link RowKindEnum}.
- */
- public static RowKindEnum fromByteValue(byte value) {
- switch (value) {
- case 0:
- return INSERT;
- case 1:
- return UPDATE_BEFORE;
- case 2:
- return UPDATE_AFTER;
- case 3:
- return DELETE;
- default:
- throw new UnsupportedOperationException(
- "Unsupported byte value '" + value + "' for row kind.");
- }
- }
-
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 087d339b0..c4b5c75b9 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -17,6 +17,9 @@
package org.apache.inlong.common.enums;
+/**
+ * Enum of task type.
+ */
public enum TaskTypeEnum {
DATABASE_MIGRATION(0),
@@ -68,7 +71,7 @@ public enum TaskTypeEnum {
case 12:
return MQTT;
default:
- throw new RuntimeException(String.format("Unsupported taskType=%s", taskType));
+ throw new RuntimeException("Unsupported task type " + taskType);
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 7ddb00337..fb6c025c1 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -130,7 +130,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager {
heartbeatMsg.setIp(reportInfo.getIp());
heartbeatMsg.setPort(reportInfo.getPort());
heartbeatMsg.setProtocolType(reportInfo.getProtocolType());
- heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getName());
+ heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
Map<String, String> commonProperties = configManager.getCommonProperties();
heartbeatMsg.setClusterTag(commonProperties.getOrDefault(
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index 5bcd6bde9..cca008c19 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -162,7 +162,7 @@ public class PulsarClientService {
* put it back into the illegal map
*/
pulsarSink.handleRequestProcError(topic, es,
- false, DataProxyErrCode.NO_AVAILABLE_PRODUCERINFO, errMsg);
+ false, DataProxyErrCode.NO_AVAILABLE_PRODUCER, errMsg);
return false;
}
TopicProducerInfo forCallBackP = producerInfo;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 793486890..01767ffb9 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -295,7 +295,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
if (commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
|| commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE,
- DataProxyErrCode.UNSUPPORTED_EXTENDFIELD_VALUE.getErrCodeStr());
+ DataProxyErrCode.UNSUPPORTED_EXTEND_FIELD_VALUE.getErrCodeStr());
MessageUtils.sourceReturnRspPackage(
commonAttrMap, resultMap, remoteChannel, msgType);
return;
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/HeartbeatClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/HeartbeatClientTest.java
index f9a42bd0e..c6c00bfd8 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/HeartbeatClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/HeartbeatClientTest.java
@@ -48,7 +48,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
@Test
void testGetComponent() {
ComponentHeartbeatResponse response = ComponentHeartbeatResponse.builder()
- .component(ComponentTypeEnum.Agent.getName())
+ .component(ComponentTypeEnum.Agent.getType())
.instance("127.0.0.1")
.build();
@@ -62,7 +62,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
);
HeartbeatQueryRequest request = new HeartbeatQueryRequest();
- request.setComponent(ComponentTypeEnum.Agent.getName());
+ request.setComponent(ComponentTypeEnum.Agent.getType());
request.setInstance("127.0.0.1");
ComponentHeartbeatResponse result = heartbeatClient.getComponentHeartbeat(request);
Assertions.assertEquals(request.getComponent(), result.getComponent());
@@ -72,7 +72,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
@Test
void testGetGroup() {
GroupHeartbeatResponse response = new GroupHeartbeatResponse();
- response.setComponent(ComponentTypeEnum.Agent.getName());
+ response.setComponent(ComponentTypeEnum.Agent.getType());
response.setInstance("127.0.0.1");
response.setInlongGroupId("test_group");
@@ -86,7 +86,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
);
HeartbeatQueryRequest request = new HeartbeatQueryRequest();
- request.setComponent(ComponentTypeEnum.Agent.getName());
+ request.setComponent(ComponentTypeEnum.Agent.getType());
request.setInstance("127.0.0.1");
request.setInlongGroupId("test_group");
GroupHeartbeatResponse result = heartbeatClient.getGroupHeartbeat(request);
@@ -98,7 +98,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
@Test
void testGetStream() {
StreamHeartbeatResponse response = new StreamHeartbeatResponse();
- response.setComponent(ComponentTypeEnum.Agent.getName());
+ response.setComponent(ComponentTypeEnum.Agent.getType());
response.setInstance("127.0.0.1");
response.setInlongGroupId("test_group");
response.setInlongStreamId("test_stream");
@@ -113,7 +113,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
);
HeartbeatQueryRequest request = new HeartbeatQueryRequest();
- request.setComponent(ComponentTypeEnum.Agent.getName());
+ request.setComponent(ComponentTypeEnum.Agent.getType());
request.setInstance("127.0.0.1");
request.setInlongGroupId("test_group");
request.setInlongStreamId("test_stream");
@@ -128,7 +128,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
void testListComponent() {
List<ComponentHeartbeatResponse> responses = Lists.newArrayList(
ComponentHeartbeatResponse.builder()
- .component(ComponentTypeEnum.Agent.getName())
+ .component(ComponentTypeEnum.Agent.getType())
.instance("127.0.0.1")
.build()
);
@@ -144,7 +144,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
);
HeartbeatPageRequest request = new HeartbeatPageRequest();
- request.setComponent(ComponentTypeEnum.Agent.getName());
+ request.setComponent(ComponentTypeEnum.Agent.getType());
PageResult<ComponentHeartbeatResponse> pageResult = heartbeatClient.listComponentHeartbeat(request);
Assertions.assertEquals(JsonUtils.toJsonString(responses),JsonUtils.toJsonString(pageResult.getList()));
}
@@ -153,7 +153,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
void testListGroup() {
List<GroupHeartbeatResponse> responses = Lists.newArrayList(
GroupHeartbeatResponse.builder()
- .component(ComponentTypeEnum.Agent.getName())
+ .component(ComponentTypeEnum.Agent.getType())
.instance("127.0.0.1")
.build()
);
@@ -169,7 +169,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
);
HeartbeatPageRequest request = new HeartbeatPageRequest();
- request.setComponent(ComponentTypeEnum.Agent.getName());
+ request.setComponent(ComponentTypeEnum.Agent.getType());
PageResult<GroupHeartbeatResponse> pageResult = heartbeatClient.listGroupHeartbeat(request);
Assertions.assertEquals(JsonUtils.toJsonString(responses),JsonUtils.toJsonString(pageResult.getList()));
}
@@ -178,7 +178,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
void testListStream() {
List<StreamHeartbeatResponse> responses = Lists.newArrayList(
StreamHeartbeatResponse.builder()
- .component(ComponentTypeEnum.Agent.getName())
+ .component(ComponentTypeEnum.Agent.getType())
.inlongGroupId("test_group")
.inlongStreamId("test_stream")
.instance("127.0.0.1")
@@ -196,7 +196,7 @@ public class HeartbeatClientTest extends ClientFactoryTest {
);
HeartbeatPageRequest request = new HeartbeatPageRequest();
- request.setComponent(ComponentTypeEnum.Agent.getName());
+ request.setComponent(ComponentTypeEnum.Agent.getType());
request.setInlongGroupId("test_group");
PageResult<StreamHeartbeatResponse> pageResult = heartbeatClient.listStreamHeartbeat(request);
Assertions.assertEquals(JsonUtils.toJsonString(responses),JsonUtils.toJsonString(pageResult.getList()));
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index 808eea29c..09614ebbc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -172,7 +172,7 @@ public class ExtractNodeUtils {
String topic = kafkaSource.getTopic();
String bootstrapServers = kafkaSource.getBootstrapServers();
Format format;
- DataTypeEnum dataType = DataTypeEnum.forName(kafkaSource.getSerializationType());
+ DataTypeEnum dataType = DataTypeEnum.forType(kafkaSource.getSerializationType());
switch (dataType) {
case CSV:
format = new CsvFormat();
@@ -244,7 +244,7 @@ public class ExtractNodeUtils {
pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
Format format;
- DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
+ DataTypeEnum dataType = DataTypeEnum.forType(pulsarSource.getSerializationType());
switch (dataType) {
case CSV:
String separatorStr = pulsarSource.getDataSeparator();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 00e6eadf6..028fb0875 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -154,7 +154,7 @@ public class LoadNodeUtils {
if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
}
- DataTypeEnum dataType = DataTypeEnum.forName(kafkaSink.getSerializationType());
+ DataTypeEnum dataType = DataTypeEnum.forType(kafkaSink.getSerializationType());
Format format;
switch (dataType) {
case CSV:
@@ -303,7 +303,7 @@ public class LoadNodeUtils {
Format format = null;
if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
dorisSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType = DataTypeEnum.forName(dorisSink.getSinkMultipleFormat());
+ DataTypeEnum dataType = DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
switch (dataType) {
case CANAL:
format = new CanalJsonFormat();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index af644a057..44b1fa6e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -77,7 +77,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
log.debug("received heartbeat: " + request);
}
heartbeatManager.reportHeartbeat(request);
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(request.getComponentType());
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(request.getComponentType());
switch (componentType) {
case Sort:
case DataProxy:
@@ -98,7 +98,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(component);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
@@ -120,7 +120,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(component);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
@@ -144,7 +144,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
Preconditions.checkNotEmpty(request.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(component);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
@@ -165,7 +165,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
String component = request.getComponent();
Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(component);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
@@ -184,7 +184,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
String component = request.getComponent();
Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(component);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
@@ -204,7 +204,7 @@ public class HeartbeatServiceImpl implements HeartbeatService {
Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- ComponentTypeEnum componentType = ComponentTypeEnum.forName(component);
+ ComponentTypeEnum componentType = ComponentTypeEnum.forType(component);
switch (componentType) {
case Sort:
case DataProxy:
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 1b89c4b87..12ebe6200 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -149,9 +149,9 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
// if the SerializationType is still null, set it to the CSV
if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
- pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
+ pulsarSource.setSerializationType(DataTypeEnum.CSV.getType());
}
- if (DataTypeEnum.CSV.getName().equalsIgnoreCase(pulsarSource.getSerializationType())) {
+ if (DataTypeEnum.CSV.getType().equalsIgnoreCase(pulsarSource.getSerializationType())) {
pulsarSource.setDataSeparator(streamInfo.getDataSeparator());
if (StringUtils.isEmpty(pulsarSource.getDataSeparator())) {
pulsarSource.setDataSeparator(String.valueOf((int) ','));
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index f5383dc18..73ce75e13 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -322,10 +322,10 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
// report heartbeat
HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, String.valueOf(port1),
- ComponentTypeEnum.DataProxy.getName());
+ ComponentTypeEnum.DataProxy.getType());
heartbeatManager.reportHeartbeat(msg1);
HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, String.valueOf(port2),
- ComponentTypeEnum.DataProxy.getName());
+ ComponentTypeEnum.DataProxy.getType());
heartbeatManager.reportHeartbeat(msg2);
// create an inlong group which use the clusterTag
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
index 353c6a629..feb54595d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
@@ -88,7 +88,7 @@ public class HeartbeatManagerTest extends ServiceBaseTest {
heartbeatMsg.setPort("46802");
heartbeatMsg.setClusterTag("default_cluster");
heartbeatMsg.setProtocolType(ProtocolType.HTTP);
- heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getName());
+ heartbeatMsg.setComponentType(ComponentTypeEnum.DataProxy.getType());
heartbeatMsg.setReportTime(System.currentTimeMillis());
return heartbeatMsg;
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
index acc3ec985..5d33db4f7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
@@ -51,7 +51,7 @@ public class HeartbeatServiceTest extends ServiceBaseTest {
@Test
public void testReportHeartbeat() {
HeartbeatReportRequest request = new HeartbeatReportRequest();
- request.setComponentType(ComponentTypeEnum.DataProxy.getName());
+ request.setComponentType(ComponentTypeEnum.DataProxy.getType());
request.setIp("127.0.0.1");
request.setPort("56802");
request.setClusterTag("default_cluster");
@@ -80,7 +80,7 @@ public class HeartbeatServiceTest extends ServiceBaseTest {
@Test
public void testGetStreamHeartbeat() {
HeartbeatQueryRequest request = new HeartbeatQueryRequest();
- request.setComponent(ComponentTypeEnum.DataProxy.getName());
+ request.setComponent(ComponentTypeEnum.DataProxy.getType());
request.setInstance("127.0.0.1");
request.setInlongGroupId("group1");
request.setInlongStreamId("stream1");