You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/27 02:34:12 UTC

[incubator-inlong] branch master updated: [INLONG-2734][Manager] Support multi serialization type for Sort in Manager (#2750)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d3dc50  [INLONG-2734][Manager] Support multi serialization type for Sort in Manager (#2750)
4d3dc50 is described below

commit 4d3dc505d6c246a3c8a3b4951cb58fbe683eb563
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sun Feb 27 10:34:07 2022 +0800

    [INLONG-2734][Manager] Support multi serialization type for Sort in Manager (#2750)
---
 .../inlong/manager/common/enums/SinkType.java      |   2 +-
 .../inlong/manager/common/enums/SourceType.java    |   4 +-
 .../manager/common/pojo/source/SourceResponse.java |   2 +
 .../pojo/source/binlog/BinlogSourceRequest.java    |   2 +
 .../pojo/source/binlog/BinlogSourceResponse.java   |   3 +
 .../common/pojo/source/kafka/KafkaSourceDTO.java   |   4 +
 .../service/sink/StreamSinkServiceImpl.java        |   8 +-
 .../service/source/StreamSourceServiceImpl.java    |   8 +-
 .../thirdparty/sort/CreateSortConfigListener.java  |  40 +++---
 .../thirdparty/sort/PushSortConfigListener.java    |   3 +
 .../thirdparty/sort/utils/SerializationUtils.java  | 138 +++++++++++++++++++++
 .../thirdparty/sort/{ => utils}/SinkInfoUtils.java |  30 ++++-
 .../sort/{ => utils}/SortFieldFormatUtils.java     |   2 +-
 .../sort/{ => utils}/SourceInfoUtils.java          |   2 +-
 14 files changed, 219 insertions(+), 29 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index f0335e7..19a547b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -26,7 +26,7 @@ public enum SinkType {
     /**
      * Get the SinkType enum via the given sinkType string
      */
-    public static SinkType getType(String sinkType) {
+    public static SinkType forType(String sinkType) {
         for (SinkType type : values()) {
             if (type.name().equals(sinkType)) {
                 return type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index f224b49..1b4f0b4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -35,7 +35,7 @@ public enum SourceType {
     /**
      * Get the SourceType enum via the given sourceType string
      */
-    public static SourceType getType(String sourceType) {
+    public static SourceType forType(String sourceType) {
         for (SourceType type : values()) {
             if (type.name().equals(sourceType)) {
                 return type;
@@ -44,7 +44,7 @@ public enum SourceType {
         throw new IllegalArgumentException(String.format("Illegal sink type for %s", sourceType));
     }
 
-    public String getType() {
+    public String forType() {
         return this.type;
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 4d90669..d596c1e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -81,4 +81,6 @@ public class SourceResponse {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    private String serializationType;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 37efa92..0d352a4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -77,5 +77,7 @@ public class BinlogSourceRequest extends SourceRequest {
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
     private String snapshotMode;
 
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index ae01fb3..a6e2fa5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -63,4 +63,7 @@ public class BinlogSourceResponse extends SourceResponse {
     @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
     private String snapshotMode;
 
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index f793223..dffdda0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -61,6 +61,9 @@ public class KafkaSourceDTO {
             notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
     private String topicPartitionOffset;
 
+    @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+    private String serializationType;
+
     /**
      * Get the dto instance from the request
      */
@@ -72,6 +75,7 @@ public class KafkaSourceDTO {
                 .recordSpeedLimit(request.getRecordSpeedLimit())
                 .byteSpeedLimit(request.getByteSpeedLimit())
                 .topicPartitionOffset(request.getTopicPartitionOffset())
+                .serializationType(request.getSerializationType())
                 .build();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 17fbc61..5f8f2bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -107,7 +107,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         Preconditions.checkEmpty(sinkExist, ErrorCodeEnum.SINK_ALREADY_EXISTS.getMessage());
 
         // According to the sink type, save sink information
-        StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+        StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
         int id = operation.saveOpt(request, operator);
 
         // If the inlong group status is [Configuration Successful], then asynchronously initiate
@@ -123,7 +123,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Override
     public SinkResponse get(Integer id, String sinkType) {
         LOGGER.debug("begin to get sink by id={}, sinkType={}", id, sinkType);
-        StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+        StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
         SinkResponse sinkResponse = operation.getById(sinkType, id);
         LOGGER.debug("success to get sink info");
         return sinkResponse;
@@ -178,7 +178,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>) sinkMapper.selectByCondition(request);
 
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
-        StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+        StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
         PageInfo<? extends SinkListResponse> pageInfo = operation.getPageInfo(entityPage);
         pageInfo.setTotal(entityPage.getTotal());
 
@@ -200,7 +200,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         String streamId = request.getInlongStreamId();
         String sinkType = request.getSinkType();
 
-        StreamSinkOperation operation = operationFactory.getInstance(SinkType.getType(sinkType));
+        StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
         operation.updateOpt(request, operator);
 
         // The inlong group status is [Configuration successful], then asynchronously initiate
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index cea9fd8..18e837e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -82,7 +82,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Preconditions.checkEmpty(sourceExist, ErrorCodeEnum.SOURCE_ALREADY_EXISTS.getMessage());
 
         // According to the source type, save source information
-        StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+        StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         int id = operation.saveOpt(request, operator);
 
         LOGGER.info("success to save source info");
@@ -92,7 +92,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Override
     public SourceResponse get(Integer id, String sourceType) {
         LOGGER.debug("begin to get source by id={}, sourceType={}", id, sourceType);
-        StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+        StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         SourceResponse sourceResponse = operation.getById(id);
         LOGGER.debug("success to get source info");
         return sourceResponse;
@@ -134,7 +134,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Page<StreamSourceEntity> entityPage = (Page<StreamSourceEntity>) sourceMapper.selectByCondition(request);
 
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
-        StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+        StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         PageInfo<? extends SourceListResponse> pageInfo = operation.getPageInfo(entityPage);
         pageInfo.setTotal(entityPage.getTotal());
 
@@ -154,7 +154,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         commonOperateService.checkGroupStatus(groupId, operator);
 
         String sourceType = request.getSourceType();
-        StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
+        StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
         operation.updateOpt(request, operator);
 
         LOGGER.info("success to update source info");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index cbea893..c555924 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -39,6 +40,11 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SerializationUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SinkInfoUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SortFieldFormatUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SourceInfoUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -47,7 +53,6 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
-import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
@@ -71,6 +76,8 @@ public class CreateSortConfigListener implements SortOperateListener {
     private InlongStreamService inlongStreamService;
     @Autowired
     private StreamSinkService streamSinkService;
+    @Autowired
+    private StreamSourceService streamSourceService;
 
     @Override
     public TaskEvent event() {
@@ -119,29 +126,32 @@ public class CreateSortConfigListener implements SortOperateListener {
 
     private DataFlowInfo createDataFlow(StreamBriefResponse streamBriefResponse,
             InlongGroupRequest inlongGroupRequest) {
+        //TODO only support one source and one sink
+        final String groupId = streamBriefResponse.getInlongGroupId();
+        final String streamId = streamBriefResponse.getInlongStreamId();
+        final InlongStreamInfo streamInfo = inlongStreamService.get(groupId, streamId);
+        List<SourceResponse> sourceResponses = streamSourceService.listSource(groupId, streamId);
+        if (CollectionUtils.isEmpty(sourceResponses)) {
+            throw new RuntimeException(String.format("No source found by stream=%s", streamBriefResponse));
+        }
+        final SourceResponse sourceResponse = sourceResponses.get(0);
         List<SinkBriefResponse> sinkBriefResponses = streamBriefResponse.getSinkList();
         if (CollectionUtils.isEmpty(sinkBriefResponses)) {
             throw new RuntimeException(String.format("No sink found by stream=%s", streamBriefResponse));
         }
-        SinkBriefResponse sinkBriefResponse = sinkBriefResponses.get(0);
+        final SinkBriefResponse sinkBriefResponse = sinkBriefResponses.get(0);
         String sinkType = sinkBriefResponse.getSinkType();
         int sinkId = sinkBriefResponse.getId();
-        SinkResponse sinkResponse = streamSinkService.get(sinkId, sinkType);
-        SourceInfo sourceInfo = createSourceInfo(inlongGroupRequest, sinkResponse);
-        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sinkResponse);
+        final SinkResponse sinkResponse = streamSinkService.get(sinkId, sinkType);
+        SourceInfo sourceInfo = createSourceInfo(inlongGroupRequest, streamInfo, sourceResponse);
+        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(streamInfo, sourceResponse, sinkResponse);
         return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
     }
 
-    private SourceInfo createSourceInfo(InlongGroupRequest groupRequest, SinkResponse sinkResponse) {
+    private SourceInfo createSourceInfo(InlongGroupRequest groupRequest, InlongStreamInfo streamInfo,
+            SourceResponse sourceResponse) {
         String middleWareType = groupRequest.getMiddlewareType();
-        String groupId = sinkResponse.getInlongGroupId();
-        String streamId = sinkResponse.getInlongStreamId();
-        InlongStreamInfo streamInfo = inlongStreamService.get(groupId, streamId);
-        DeserializationInfo deserializationInfo = null;
-        if (StringUtils.isNotEmpty(streamInfo.getDataSeparator())) {
-            char separator = (char) Integer.parseInt(streamInfo.getDataSeparator());
-            deserializationInfo = new InLongMsgCsvDeserializationInfo(streamId, separator);
-        }
+
         List<FieldInfo> fieldInfos = Lists.newArrayList();
         if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
             fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
@@ -150,6 +160,8 @@ public class CreateSortConfigListener implements SortOperateListener {
                 return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
             }).collect(Collectors.toList());
         }
+        DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
+                streamInfo);
         if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
             return createPulsarSourceInfo(groupRequest, streamInfo, deserializationInfo, fieldInfos);
         } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index 8b2890b..bd49771 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -35,6 +35,9 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SinkInfoUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SortFieldFormatUtils;
+import org.apache.inlong.manager.service.thirdparty.sort.utils.SourceInfoUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SerializationUtils.java
new file mode 100644
index 0000000..6c632f9
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SerializationUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.sort.protocol.deserialization.AvroDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
+import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
+import org.apache.inlong.sort.protocol.serialization.AvroSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.DebeziumSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
+import org.springframework.util.Assert;
+
+public class SerializationUtils {
+
+    public static DeserializationInfo createDeserializationInfo(SourceResponse sourceResponse,
+            InlongStreamInfo streamInfo) {
+        SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
+        switch (sourceType) {
+            case DB_BINLOG:
+                return forBinlog((BinlogSourceResponse) sourceResponse, streamInfo);
+            case KAFKA:
+                return forKafka((KafkaSourceResponse) sourceResponse, streamInfo);
+            case FILE:
+                return forFile(sourceResponse, streamInfo);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupport sourceType for Inlong:%s", sourceType));
+        }
+    }
+
+    public static SerializationInfo createSerializationInfo(SourceResponse sourceResponse, SinkResponse sinkResponse,
+            InlongStreamInfo inlongStreamInfo) {
+        SinkType sinkType = SinkType.forType(sinkResponse.getSinkType());
+        switch (sinkType) {
+            case HIVE:
+                return null;
+            case KAFKA:
+                return forKafka(sourceResponse, (KafkaSinkResponse) sinkResponse, inlongStreamInfo);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupport sinkType for Inlong:%s", sinkType));
+        }
+    }
+
+    public static DeserializationInfo forBinlog(BinlogSourceResponse binlogSourceResponse,
+            InlongStreamInfo streamInfo) {
+        return new DebeziumDeserializationInfo(true, binlogSourceResponse.getTimestampFormatStandard());
+    }
+
+    public static DeserializationInfo forKafka(KafkaSourceResponse kafkaSourceResponse,
+            InlongStreamInfo streamInfo) {
+        String serializationType = kafkaSourceResponse.getSerializationType();
+        DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
+        switch (dataType) {
+            case CSV:
+                char seperator = streamInfo.getDataSeparator().toCharArray()[0];
+                return new CsvDeserializationInfo(seperator);
+            case AVRO:
+                return new AvroDeserializationInfo();
+            case JSON:
+                return new JsonDeserializationInfo();
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unsupport serializationType for Kafka source:%s", serializationType));
+        }
+    }
+
+    public static SerializationInfo forKafka(SourceResponse sourceResponse, KafkaSinkResponse kafkaSinkResponse,
+            InlongStreamInfo streamInfo) {
+        String serializationType = kafkaSinkResponse.getSerializationType();
+        DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
+        switch (dataType) {
+            case AVRO:
+                return new AvroSerializationInfo();
+            case JSON:
+                return new JsonSerializationInfo();
+            case CANAL:
+                Assert.isInstanceOf(BinlogSourceResponse.class, sourceResponse,
+                        "Unsupport serializationType for Kafka;");
+                BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
+                return new CanalSerializationInfo(binlogSourceResponse.getTimestampFormatStandard(),
+                        "FAIL", "", false);
+            case DEBEZIUM_JSON:
+                Assert.isInstanceOf(BinlogSourceResponse.class, sourceResponse,
+                        "Unsupport serializationType for Kafka;");
+                binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
+                return new DebeziumSerializationInfo(binlogSourceResponse.getTimestampFormatStandard(),
+                        "FAIL", "", false);
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unsupport serializationType for Kafka sink:%s", serializationType));
+        }
+    }
+
+    public static DeserializationInfo forFile(SourceResponse sourceResponse,
+            InlongStreamInfo streamInfo) {
+        String serializationType = sourceResponse.getSerializationType();
+        DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
+        switch (dataType) {
+            case CSV:
+                char seperator = streamInfo.getDataSeparator().toCharArray()[0];
+                return new CsvDeserializationInfo(seperator);
+            case AVRO:
+                return new AvroDeserializationInfo();
+            case JSON:
+                return new JsonDeserializationInfo();
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unsupport type for File source:%s", serializationType));
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SinkInfoUtils.java
similarity index 81%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SinkInfoUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SinkInfoUtils.java
index fcd650a..3db4900 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SinkInfoUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.thirdparty.sort;
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,13 +29,18 @@ import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 
 public class SinkInfoUtils {
@@ -55,9 +60,20 @@ public class SinkInfoUtils {
     }
 
     public static SinkInfo createSinkInfo(SinkResponse sinkResponse) {
+        return createSinkInfo(null, null, sinkResponse);
+    }
+
+    public static SinkInfo createSinkInfo(SourceResponse sourceResponse, SinkResponse sinkResponse) {
+        return createSinkInfo(null, sourceResponse, sinkResponse);
+    }
+
+    public static SinkInfo createSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
+            SinkResponse sinkResponse) {
         String sinkType = sinkResponse.getSinkType();
-        if (SinkType.getType(sinkType) == SinkType.HIVE) {
+        if (SinkType.forType(sinkType) == SinkType.HIVE) {
             return createHiveSinkInfo((HiveSinkResponse) sinkResponse);
+        } else if (SinkType.forType(sinkType) == SinkType.KAFKA) {
+            return createKafkaSinkInfo(inlongStreamInfo, sourceResponse, (KafkaSinkResponse) sinkResponse);
         } else {
             //todo clickhouse and iceberg is wait to support
             throw new RuntimeException(
@@ -65,6 +81,16 @@ public class SinkInfoUtils {
         }
     }
 
+    private static KafkaSinkInfo createKafkaSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
+            KafkaSinkResponse kafkaSinkResponse) {
+        List<FieldInfo> fieldInfoList = getSinkFields(kafkaSinkResponse.getFieldList(), null);
+        String addressUrl = kafkaSinkResponse.getAddress();
+        String topicName = kafkaSinkResponse.getTopicName();
+        SerializationInfo serializationInfo = SerializationUtils.createSerializationInfo(sourceResponse,
+                kafkaSinkResponse, inlongStreamInfo);
+        return new KafkaSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), addressUrl, topicName, serializationInfo);
+    }
+
     private static HiveSinkInfo createHiveSinkInfo(HiveSinkResponse hiveInfo) {
         if (hiveInfo.getJdbcUrl() == null) {
             throw new RuntimeException(String.format("hiveSink={} server url cannot be empty", hiveInfo));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SortFieldFormatUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SortFieldFormatUtils.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SortFieldFormatUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SortFieldFormatUtils.java
index 0e6be39..43cf09b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SortFieldFormatUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SortFieldFormatUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.thirdparty.sort;
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
 
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SourceInfoUtils.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SourceInfoUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SourceInfoUtils.java
index e7ff4a9..66bfb86 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/utils/SourceInfoUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.thirdparty.sort;
+package org.apache.inlong.manager.service.thirdparty.sort.utils;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;