You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/11 02:26:00 UTC

[incubator-inlong] branch master updated: [INLONG-3589][Manager] Add Iceberg sink info for Sort (#3590)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3693de893 [INLONG-3589][Manager] Add Iceberg sink info for Sort (#3590)
3693de893 is described below

commit 3693de89319bcc150ada7b59bedcfae08c5205af
Author: healchow <he...@gmail.com>
AuthorDate: Mon Apr 11 10:25:54 2022 +0800

    [INLONG-3589][Manager] Add Iceberg sink info for Sort (#3590)
---
 .../inlong/manager/common/enums/Constant.java      | 12 --------
 .../inlong/manager/common/enums/SinkType.java      | 10 ++++++-
 .../inlong/manager/common/enums/SourceType.java    |  4 +--
 .../manager/common/pojo/sink/SinkFieldBase.java    |  8 +++---
 .../common/pojo/sink/ck/ClickHouseSinkRequest.java |  4 +--
 .../pojo/sink/ck/ClickHouseSinkResponse.java       |  4 +--
 .../common/pojo/sink/hive/HiveSinkRequest.java     |  4 +--
 .../common/pojo/sink/hive/HiveSinkResponse.java    |  4 +--
 .../pojo/sink/iceberg/IcebergSinkRequest.java      |  4 +--
 .../pojo/sink/iceberg/IcebergSinkResponse.java     |  4 +--
 .../common/pojo/sink/kafka/KafkaSinkRequest.java   |  4 +--
 .../common/pojo/sink/kafka/KafkaSinkResponse.java  |  4 +--
 .../common/pojo/source/kafka/KafkaSourceDTO.java   |  4 ++-
 .../manager/dao/entity/StreamSinkFieldEntity.java  |  2 +-
 .../service/sink/StreamSinkServiceImpl.java        |  4 +--
 ...Operation.java => ClickHouseSinkOperation.java} | 32 +++++++++++-----------
 ...amSinkOperation.java => HiveSinkOperation.java} | 19 ++++++-------
 ...inkOperation.java => IcebergSinkOperation.java} | 21 +++++++-------
 ...mSinkOperation.java => KafkaSinkOperation.java} | 22 +++++++--------
 ...Operation.java => AbstractSourceOperation.java} |  4 +--
 ...ceOperation.java => BinlogSourceOperation.java} |  4 +--
 ...urceOperation.java => FileSourceOperation.java} |  4 +--
 ...rceOperation.java => KafkaSourceOperation.java} |  4 +--
 .../hive/CreateHiveTableEventSelector.java         |  6 ++--
 .../thirdparty/sort/util/SinkInfoUtils.java        | 29 ++++++++++++++------
 .../stream/CreateStreamWorkflowDefinition.java     |  4 +--
 .../core/sink/ClickHouseStreamSinkServiceTest.java |  9 +++---
 .../core/sink/HiveStreamSinkServiceTest.java       | 11 ++++----
 .../core/sink/IcebergStreamSinkServiceTest.java    | 11 ++++----
 .../core/sink/KafkaStreamSinkServiceTest.java      |  9 +++---
 30 files changed, 138 insertions(+), 127 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 44c185d71..778a796ff 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -28,14 +28,6 @@ public class Constant {
     public static final String URL_SPLITTER = ",";
     public static final String HOST_SPLITTER = ":";
 
-    public static final String SINK_HIVE = "HIVE";
-
-    public static final String SINK_KAFKA = "KAFKA";
-
-    public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
-
-    public static final String SINK_ICEBERG = "ICEBERG";
-
     public static final String DATA_SOURCE_DB = "DB";
 
     public static final String DATA_SOURCE_FILE = "FILE";
@@ -69,10 +61,6 @@ public class Constant {
 
     public static final String REQUEST_IS_EMPTY = "request is empty";
 
-    public static final String SINK_TYPE_IS_EMPTY = "sinkType is empty";
-
-    public static final String SINK_TYPE_NOT_SAME = "Expected sinkType is %s, but found %s";
-
     public static final String PULSAR_TOPIC_TYPE_SERIAL = "SERIAL";
 
     public static final String PULSAR_TOPIC_TYPE_PARALLEL = "PARALLEL";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index 19a547bfa..3eea7f22a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -21,7 +21,15 @@ import java.util.Locale;
 
 public enum SinkType {
 
-    HIVE, ES, CLICKHOUSE, ICEBERG, KAFKA;
+    HIVE, KAFKA, ICEBERG, CLICKHOUSE;
+
+    public static final String SINK_HIVE = "HIVE";
+    public static final String SINK_KAFKA = "KAFKA";
+    public static final String SINK_ICEBERG = "ICEBERG";
+    public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
+
+    public static final String SINK_TYPE_IS_EMPTY = "Sink type is empty";
+    public static final String SINK_TYPE_NOT_SAME = "Expected sink type is %s, but found %s";
 
     /**
      * Get the SinkType enum via the given sinkType string
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 33ecdca8f..42bfd9b91 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -37,8 +37,8 @@ public enum SourceType {
     public static final String SOURCE_SQL = "SQL";
     public static final String SOURCE_BINLOG = "BINLOG";
     public static final String SOURCE_KAFKA = "KAFKA";
-    public static final String SOURCE_TYPE_IS_EMPTY = "sourceType is empty";
-    public static final String SOURCE_TYPE_NOT_SAME = "Expected sourceType is %s, but found %s";
+    public static final String SOURCE_TYPE_IS_EMPTY = "Source type is empty";
+    public static final String SOURCE_TYPE_NOT_SAME = "Expected source type is %s, but found %s";
 
     @Getter
     private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
index ff355e32c..42a0efecf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkFieldBase.java
@@ -43,16 +43,16 @@ public class SinkFieldBase {
     @ApiModelProperty("Source field type")
     private String sourceFieldType;
 
-    @ApiModelProperty("Field length")
+    @ApiModelProperty("Length of fixed type")
     private Integer fieldLength;
 
-    @ApiModelProperty("Field precision")
+    @ApiModelProperty("Precision of decimal type, that is, field length. precision >= scale")
     private Integer fieldPrecision;
 
-    @ApiModelProperty("Field scale")
+    @ApiModelProperty("Range of decimal type, that is, the number of decimal places. precision >= scale")
     private Integer fieldScale;
 
-    @ApiModelProperty("Partition strategy, including: Identity, Year, Month, Day, Hour, Bucket")
+    @ApiModelProperty("Partition strategy, including: None, Identity, Year, Month, Day, Hour, Bucket, Truncate")
     private String partitionStrategy;
 
     @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601"
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
index 2f28191d2..6493ae5c3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the ClickHouse sink info")
-@JsonTypeDefine(value = Constant.SINK_CLICKHOUSE)
+@JsonTypeDefine(value = SinkType.SINK_CLICKHOUSE)
 public class ClickHouseSinkRequest extends SinkRequest {
 
     @ApiModelProperty("ClickHouse JDBC URL")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
index 0e8ea89f1..bb0e47654 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 
 /**
@@ -60,7 +60,7 @@ public class ClickHouseSinkResponse extends SinkResponse {
     private Integer writeMaxRetryTimes;
 
     public ClickHouseSinkResponse() {
-        this.sinkType = Constant.SINK_CLICKHOUSE;
+        this.sinkType = SinkType.SINK_CLICKHOUSE;
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
index 5fa303f1e..80d8dc52b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 
@@ -35,7 +35,7 @@ import java.util.List;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the Hive sink info")
-@JsonTypeDefine(value = Constant.SINK_HIVE)
+@JsonTypeDefine(value = SinkType.SINK_HIVE)
 public class HiveSinkRequest extends SinkRequest {
 
     @ApiModelProperty("Whether to enable create table, 1: enable, 0: disable, default is 1")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
index 1b597f91d..d055c3bf4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 
 import java.util.List;
@@ -73,7 +73,7 @@ public class HiveSinkResponse extends SinkResponse {
     private String dataSeparator;
 
     public HiveSinkResponse() {
-        this.sinkType = Constant.SINK_HIVE;
+        this.sinkType = SinkType.SINK_HIVE;
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
index 1fb0a24f4..6938daed2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the Iceberg sink info")
-@JsonTypeDefine(value = Constant.SINK_ICEBERG)
+@JsonTypeDefine(value = SinkType.SINK_ICEBERG)
 public class IcebergSinkRequest extends SinkRequest {
 
     @ApiModelProperty("Hive JDBC URL")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java
index f40ceb8bc..8b2247b26 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 
 /**
@@ -65,6 +65,6 @@ public class IcebergSinkResponse extends SinkResponse {
     private String dataConsistency;
 
     public IcebergSinkResponse() {
-        this.sinkType = Constant.SINK_ICEBERG;
+        this.sinkType = SinkType.SINK_ICEBERG;
     }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
index 791f8dc08..49fb9f81f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the Kafka sink info")
-@JsonTypeDefine(value = Constant.SINK_KAFKA)
+@JsonTypeDefine(value = SinkType.SINK_KAFKA)
 public class KafkaSinkRequest extends SinkRequest {
 
     @ApiModelProperty("Kafka bootstrap servers")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
index f7929445f..e6f62f4f6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 
 /**
@@ -51,7 +51,7 @@ public class KafkaSinkResponse extends SinkResponse {
     private String autoOffsetReset;
 
     public KafkaSinkResponse() {
-        this.sinkType = Constant.SINK_KAFKA;
+        this.sinkType = SinkType.SINK_KAFKA;
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 2c5c28f48..4eb227f5c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -32,10 +32,10 @@ import javax.validation.constraints.NotNull;
 /**
  * kafka source information data transfer object.
  */
+@Data
 @Builder
 @AllArgsConstructor
 @NoArgsConstructor
-@Data
 public class KafkaSourceDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -62,6 +62,8 @@ public class KafkaSourceDTO {
     private String topicPartitionOffset;
 
     /**
+     * The strategy of auto offset reset.
+     *
      * @see <a href="https://docs.confluent.io/platform/current/clients/consumer.html">Kafka_consumer_config</a>
      */
     @ApiModelProperty(value = "The strategy of auto offset reset",
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
index 00dee4daf..89ccebce2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkFieldEntity.java
@@ -41,7 +41,7 @@ public class StreamSinkFieldEntity implements Serializable {
     private Integer fieldLength;
     private Integer fieldPrecision;
     private Integer fieldScale;
-    private String partitionStrategy; // For Iceberg, including: Identity, Year, Month, Day, Hour, Bucket
+    private String partitionStrategy;
 
     private Integer isMetaField;
     private String fieldFormat;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index f030959bd..f49f12508 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -333,7 +333,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         for (SinkApproveDTO dto : approveList) {
             // According to the sink type, save sink information
             String sinkType = dto.getSinkType();
-            Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
+            Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
 
             StreamSinkEntity entity = new StreamSinkEntity();
             entity.setId(dto.getId());
@@ -357,7 +357,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         String streamId = request.getInlongStreamId();
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
         String sinkType = request.getSinkType();
-        Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
+        Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
index 2be4ba955..6aff31f3c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
@@ -20,16 +20,10 @@ package org.apache.inlong.manager.service.sink.ck;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.function.Supplier;
-import javax.validation.constraints.NotNull;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
@@ -53,13 +47,19 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
 /**
  * ClickHouse sink operation
  */
 @Service
-public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
+public class ClickHouseSinkOperation implements StreamSinkOperation {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseStreamSinkOperation.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkOperation.class);
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -76,7 +76,7 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
     @Override
     public Integer saveOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(sinkType),
+        Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(sinkType),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
 
         ClickHouseSinkRequest sinkRequest = (ClickHouseSinkRequest) request;
@@ -141,8 +141,8 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_CLICKHOUSE, existType));
+        Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
 
         SinkResponse response = this.getFromEntity(entity, ClickHouseSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -161,8 +161,8 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
         }
 
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_CLICKHOUSE, existType));
+        Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
 
         ClickHouseSinkDTO dto = ClickHouseSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -182,8 +182,8 @@ public class ClickHouseStreamSinkOperation implements StreamSinkOperation {
     @Override
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_CLICKHOUSE.equals(sinkType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_CLICKHOUSE, sinkType));
+        Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(sinkType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
index 92eea579c..dc1b3aa09 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkType;
@@ -59,9 +58,9 @@ import java.util.function.Supplier;
  * Hive sink operation
  */
 @Service
-public class HiveStreamSinkOperation implements StreamSinkOperation {
+public class HiveSinkOperation implements StreamSinkOperation {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(HiveStreamSinkOperation.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(HiveSinkOperation.class);
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -78,7 +77,7 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
     @Override
     public Integer saveOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_HIVE.equals(sinkType),
+        Preconditions.checkTrue(SinkType.SINK_HIVE.equals(sinkType),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
         HiveSinkRequest hiveRequest = (HiveSinkRequest) request;
         SinkInfoUtils.checkPartitionField(hiveRequest.getFieldList(), hiveRequest.getPartitionFieldList());
@@ -142,8 +141,8 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_HIVE.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_HIVE, existType));
+        Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
 
         SinkResponse response = this.getFromEntity(entity, HiveSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -162,8 +161,8 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
         }
 
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_HIVE.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_HIVE, existType));
+        Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
 
         HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -183,8 +182,8 @@ public class HiveStreamSinkOperation implements StreamSinkOperation {
     @Override
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_HIVE.equals(sinkType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_HIVE, sinkType));
+        Preconditions.checkTrue(SinkType.SINK_HIVE.equals(sinkType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
index 284cbf7ad..d09fa3ea3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
@@ -22,9 +22,8 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
@@ -54,9 +53,9 @@ import java.util.List;
 import java.util.function.Supplier;
 
 @Service
-public class IcebergStreamSinkOperation implements StreamSinkOperation {
+public class IcebergSinkOperation implements StreamSinkOperation {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(IcebergStreamSinkOperation.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSinkOperation.class);
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -73,7 +72,7 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
     @Override
     public Integer saveOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(sinkType),
+        Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(sinkType),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
 
         IcebergSinkRequest icebergSinkRequest = (IcebergSinkRequest) request;
@@ -136,8 +135,8 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_ICEBERG, existType));
+        Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
 
         SinkResponse response = this.getFromEntity(entity, IcebergSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -156,8 +155,8 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
         }
 
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_ICEBERG, existType));
+        Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
 
         IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -177,8 +176,8 @@ public class IcebergStreamSinkOperation implements StreamSinkOperation {
     @Override
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_ICEBERG.equals(sinkType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_ICEBERG, sinkType));
+        Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(sinkType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaStreamSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
similarity index 92%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaStreamSinkOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
index c1cdf4926..e8a4a169c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaStreamSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SinkType;
@@ -58,9 +57,9 @@ import java.util.function.Supplier;
  * Kafka sink operation
  */
 @Service
-public class KafkaStreamSinkOperation implements StreamSinkOperation {
+public class KafkaSinkOperation implements StreamSinkOperation {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSinkOperation.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkOperation.class);
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -77,7 +76,7 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
     @Override
     public Integer saveOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_KAFKA.equals(sinkType),
+        Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(sinkType),
                 ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
 
         KafkaSinkRequest kafkaSinkRequest = (KafkaSinkRequest) request;
@@ -140,12 +139,11 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_KAFKA.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_KAFKA, existType));
+        Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
         SinkResponse response = this.getFromEntity(entity, KafkaSinkResponse::new);
         List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
-        List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities,
-                SinkFieldResponse::new);
+        List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
         response.setFieldList(infos);
         return response;
     }
@@ -157,8 +155,8 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
             return result;
         }
         String existType = entity.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_KAFKA.equals(existType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_KAFKA, existType));
+        Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
 
         KafkaSinkDTO dto = KafkaSinkDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
@@ -178,8 +176,8 @@ public class KafkaStreamSinkOperation implements StreamSinkOperation {
     @Override
     public void updateOpt(SinkRequest request, String operator) {
         String sinkType = request.getSinkType();
-        Preconditions.checkTrue(Constant.SINK_KAFKA.equals(sinkType),
-                String.format(Constant.SINK_TYPE_NOT_SAME, Constant.SINK_KAFKA, sinkType));
+        Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(sinkType),
+                String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, sinkType));
 
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index 7ab709784..e99c00421 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -44,9 +44,9 @@ import java.util.List;
 /**
  * Default operation of stream source.
  */
-public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {
+public abstract class AbstractSourceOperation implements StreamSourceOperation {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperation.class);
     @Autowired
     protected StreamSourceEntityMapper sourceMapper;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
index 2c0d296c6..3f9c35923 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
-import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
  * Binlog source operation
  */
 @Service
-public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
+public class BinlogSourceOperation extends AbstractSourceOperation {
 
     @Autowired
     private ObjectMapper objectMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
index 29e1e4938..da5bc72b7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
@@ -29,14 +29,14 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
-import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.function.Supplier;
 
 @Service
-public class FileStreamSourceOperation extends AbstractStreamSourceOperation {
+public class FileSourceOperation extends AbstractSourceOperation {
 
     @Autowired
     private ObjectMapper objectMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
index f54a3a782..3fd771d74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
-import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
  * kafka stream source operation.
  */
 @Service
-public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
+public class KafkaSourceOperation extends AbstractSourceOperation {
 
     @Autowired
     private ObjectMapper objectMapper;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
index 5555bcfc8..214d5f0f2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/hive/CreateHiveTableEventSelector.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.service.thirdparty.hive;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -54,7 +54,7 @@ public class CreateHiveTableEventSelector implements EventSelector {
             return false;
         }
         String groupId = form.getInlongGroupId();
-        List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, Constant.SINK_HIVE,
+        List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
                 streamMapper.selectByGroupId(groupId)
                         .stream()
                         .map(InlongStreamEntity::getInlongStreamId)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index ad9a734d5..d1490c020 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HivePartitionField;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -40,6 +41,7 @@ import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFieldPartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.IcebergSinkInfo;
 import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 
@@ -66,10 +68,12 @@ public class SinkInfoUtils {
             sinkInfo = createHiveSinkInfo((HiveSinkResponse) sinkResponse, sinkFields);
         } else if (SinkType.forType(sinkType) == SinkType.KAFKA) {
             sinkInfo = createKafkaSinkInfo(sourceResponse, (KafkaSinkResponse) sinkResponse, sinkFields);
+        } else if (SinkType.SINK_ICEBERG.equals(sinkType)) {
+            sinkInfo = createIcebergSinkInfo((IcebergSinkResponse) sinkResponse, sinkFields);
         } else if (SinkType.forType(sinkType) == SinkType.CLICKHOUSE) {
             sinkInfo = createClickhouseSinkInfo((ClickHouseSinkResponse) sinkResponse, sinkFields);
         } else {
-            throw new RuntimeException(String.format("Unsupported SinkType {%s}", sinkType));
+            throw new BusinessException(String.format("Unsupported SinkType {%s}", sinkType));
         }
         return sinkInfo;
     }
@@ -77,16 +81,16 @@ public class SinkInfoUtils {
     private static ClickHouseSinkInfo createClickhouseSinkInfo(ClickHouseSinkResponse sinkResponse,
             List<FieldInfo> sinkFields) {
         if (StringUtils.isEmpty(sinkResponse.getJdbcUrl())) {
-            throw new RuntimeException(String.format("ClickHouse={%s} server url cannot be empty", sinkResponse));
+            throw new BusinessException(String.format("ClickHouse={%s} server url cannot be empty", sinkResponse));
         } else if (CollectionUtils.isEmpty(sinkResponse.getFieldList())) {
-            throw new RuntimeException(String.format("ClickHouse={%s} fields cannot be empty", sinkResponse));
+            throw new BusinessException(String.format("ClickHouse={%s} fields cannot be empty", sinkResponse));
         } else if (StringUtils.isEmpty(sinkResponse.getTableName())) {
-            throw new RuntimeException(String.format("ClickHouse={%s} table name cannot be empty", sinkResponse));
+            throw new BusinessException(String.format("ClickHouse={%s} table name cannot be empty", sinkResponse));
         } else if (StringUtils.isEmpty(sinkResponse.getDatabaseName())) {
-            throw new RuntimeException(String.format("ClickHouse={%s} database name cannot be empty", sinkResponse));
+            throw new BusinessException(String.format("ClickHouse={%s} database name cannot be empty", sinkResponse));
         }
         if (sinkResponse.getDistributedTable() == null) {
-            throw new RuntimeException(String.format("ClickHouse={%s} distribute cannot be empty", sinkResponse));
+            throw new BusinessException(String.format("ClickHouse={%s} distribute cannot be empty", sinkResponse));
         }
 
         ClickHouseSinkInfo.PartitionStrategy partitionStrategy;
@@ -108,6 +112,15 @@ public class SinkInfoUtils {
                 sinkResponse.getWriteMaxRetryTimes());
     }
 
+    // TODO Need set more configs for IcebergSinkInfo
+    private static IcebergSinkInfo createIcebergSinkInfo(IcebergSinkResponse sinkResponse, List<FieldInfo> sinkFields) {
+        if (StringUtils.isEmpty(sinkResponse.getDataPath())) {
+            throw new BusinessException(String.format("Iceberg={%s} data path cannot be empty", sinkResponse));
+        }
+
+        return new IcebergSinkInfo(sinkFields.toArray(new FieldInfo[0]), sinkResponse.getDataPath());
+    }
+
     private static KafkaSinkInfo createKafkaSinkInfo(SourceResponse sourceResponse, KafkaSinkResponse sinkResponse,
             List<FieldInfo> sinkFields) {
         String addressUrl = sinkResponse.getBootstrapServers();
@@ -122,10 +135,10 @@ public class SinkInfoUtils {
      */
     private static HiveSinkInfo createHiveSinkInfo(HiveSinkResponse hiveInfo, List<FieldInfo> sinkFields) {
         if (hiveInfo.getJdbcUrl() == null) {
-            throw new RuntimeException(String.format("HiveSink={%s} server url cannot be empty", hiveInfo));
+            throw new BusinessException(String.format("HiveSink={%s} server url cannot be empty", hiveInfo));
         }
         if (CollectionUtils.isEmpty(hiveInfo.getFieldList())) {
-            throw new RuntimeException(String.format("HiveSink={%s} fields cannot be empty", hiveInfo));
+            throw new BusinessException(String.format("HiveSink={%s} fields cannot be empty", hiveInfo));
         }
         // Use the field separator in Hive, the default is TextFile
         Character separator = (char) Integer.parseInt(hiveInfo.getDataSeparator());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index e5c106d8c..373f0eea7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableForStreamListener;
@@ -124,7 +124,7 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
             GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
             String groupId = form.getInlongGroupId();
             String streamId = form.getInlongStreamId();
-            List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, Constant.SINK_HIVE,
+            List<String> dsForHive = sinkService.getExistsStreamIdList(groupId, SinkType.SINK_HIVE,
                     Collections.singletonList(streamId));
             if (CollectionUtils.isEmpty(dsForHive)) {
                 log.warn("inlong group [{}] adn inlong stream [{}] does not have sink, skip create hive table", groupId,
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
index 9d9e75335..0484cd74f 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/ClickHouseStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.core.sink;
 
 import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
@@ -58,7 +59,7 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkName(sinkName);
-        sinkInfo.setSinkType(Constant.SINK_CLICKHOUSE);
+        sinkInfo.setSinkType(SinkType.SINK_CLICKHOUSE);
         sinkInfo.setJdbcUrl(ckJdbcUrl);
         sinkInfo.setUsername(ckUsername);
         sinkInfo.setDatabaseName(ckDatabaseName);
@@ -69,19 +70,19 @@ public class ClickHouseStreamSinkServiceTest extends ServiceBaseTest {
 
     @After
     public void deleteKafkaSink() {
-        boolean result = sinkService.delete(sinkId, Constant.SINK_CLICKHOUSE, globalOperator);
+        boolean result = sinkService.delete(sinkId, SinkType.SINK_CLICKHOUSE, globalOperator);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testListByIdentifier() {
-        SinkResponse sink = sinkService.get(sinkId, Constant.SINK_CLICKHOUSE);
+        SinkResponse sink = sinkService.get(sinkId, SinkType.SINK_CLICKHOUSE);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
     }
 
     @Test
     public void testGetAndUpdate() {
-        SinkResponse response = sinkService.get(sinkId, Constant.SINK_CLICKHOUSE);
+        SinkResponse response = sinkService.get(sinkId, SinkType.SINK_CLICKHOUSE);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         ClickHouseSinkResponse kafkaSinkResponse = (ClickHouseSinkResponse) response;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
index bb51ef5b2..3f49f3dc4 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/HiveStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.core.sink;
 
 import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
@@ -50,7 +51,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
         HiveSinkRequest sinkInfo = new HiveSinkRequest();
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
-        sinkInfo.setSinkType(Constant.SINK_HIVE);
+        sinkInfo.setSinkType(SinkType.SINK_HIVE);
         sinkInfo.setEnableCreateResource(Constant.DISABLE_CREATE_RESOURCE);
         sinkInfo.setSinkName(sinkName);
         return sinkService.save(sinkInfo, globalOperator);
@@ -61,7 +62,7 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
         Integer id = this.saveSink();
         Assert.assertNotNull(id);
 
-        boolean result = sinkService.delete(id, Constant.SINK_HIVE, globalOperator);
+        boolean result = sinkService.delete(id, SinkType.SINK_HIVE, globalOperator);
         Assert.assertTrue(result);
     }
 
@@ -69,16 +70,16 @@ public class HiveStreamSinkServiceTest extends ServiceBaseTest {
     public void testListByIdentifier() {
         Integer id = this.saveSink();
 
-        SinkResponse sink = sinkService.get(id, Constant.SINK_HIVE);
+        SinkResponse sink = sinkService.get(id, SinkType.SINK_HIVE);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
 
-        sinkService.delete(id, Constant.SINK_HIVE, globalOperator);
+        sinkService.delete(id, SinkType.SINK_HIVE, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSink();
-        SinkResponse response = sinkService.get(id, Constant.SINK_HIVE);
+        SinkResponse response = sinkService.get(id, SinkType.SINK_HIVE);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         HiveSinkResponse hiveResponse = (HiveSinkResponse) response;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
index b3ad88ca9..2cc955477 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/IcebergStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.core.sink;
 
 import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
@@ -46,7 +47,7 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
         IcebergSinkRequest sinkInfo = new IcebergSinkRequest();
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
-        sinkInfo.setSinkType(Constant.SINK_ICEBERG);
+        sinkInfo.setSinkType(SinkType.SINK_ICEBERG);
         sinkInfo.setEnableCreateResource(Constant.DISABLE_CREATE_RESOURCE);
         sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
         sinkInfo.setSinkName(sinkName);
@@ -57,22 +58,22 @@ public class IcebergStreamSinkServiceTest extends ServiceBaseTest {
     public void testSaveAndDelete() {
         Integer id = this.saveSink();
         Assert.assertNotNull(id);
-        boolean result = sinkService.delete(id, Constant.SINK_ICEBERG, globalOperator);
+        boolean result = sinkService.delete(id, SinkType.SINK_ICEBERG, globalOperator);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testListByIdentifier() {
         Integer id = this.saveSink();
-        SinkResponse sink = sinkService.get(id, Constant.SINK_ICEBERG);
+        SinkResponse sink = sinkService.get(id, SinkType.SINK_ICEBERG);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
-        sinkService.delete(id, Constant.SINK_ICEBERG, globalOperator);
+        sinkService.delete(id, SinkType.SINK_ICEBERG, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSink();
-        SinkResponse response = sinkService.get(id, Constant.SINK_ICEBERG);
+        SinkResponse response = sinkService.get(id, SinkType.SINK_ICEBERG);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         IcebergSinkResponse icebergSinkResponse = (IcebergSinkResponse) response;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
index 9d441349f..6f48f7812 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/KafkaStreamSinkServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.core.sink;
 
 import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
@@ -56,7 +57,7 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
         KafkaSinkRequest sinkInfo = new KafkaSinkRequest();
         sinkInfo.setInlongGroupId(globalGroupId);
         sinkInfo.setInlongStreamId(globalStreamId);
-        sinkInfo.setSinkType(Constant.SINK_KAFKA);
+        sinkInfo.setSinkType(SinkType.SINK_KAFKA);
         sinkInfo.setSinkName(sinkName);
         sinkInfo.setSerializationType(serializationType);
         sinkInfo.setBootstrapServers(bootstrapServers);
@@ -67,19 +68,19 @@ public class KafkaStreamSinkServiceTest extends ServiceBaseTest {
 
     @After
     public void deleteKafkaSink() {
-        boolean result = sinkService.delete(kafkaSinkId, Constant.SINK_KAFKA, globalOperator);
+        boolean result = sinkService.delete(kafkaSinkId, SinkType.SINK_KAFKA, globalOperator);
         Assert.assertTrue(result);
     }
 
     @Test
     public void testListByIdentifier() {
-        SinkResponse sink = sinkService.get(kafkaSinkId, Constant.SINK_KAFKA);
+        SinkResponse sink = sinkService.get(kafkaSinkId, SinkType.SINK_KAFKA);
         Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
     }
 
     @Test
     public void testGetAndUpdate() {
-        SinkResponse response = sinkService.get(kafkaSinkId, Constant.SINK_KAFKA);
+        SinkResponse response = sinkService.get(kafkaSinkId, SinkType.SINK_KAFKA);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         KafkaSinkResponse kafkaSinkResponse = (KafkaSinkResponse) response;