You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/03/09 09:33:12 UTC

[incubator-inlong] branch master updated: [INLONG-3012][Manager] Support built-in field for source and sink info (#3016)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cb8500e  [INLONG-3012][Manager] Support built-in field for source and sink info (#3016)
cb8500e is described below

commit cb8500e913fa65c3674822fda2f67b7287c04e1f
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 9 17:33:06 2022 +0800

    [INLONG-3012][Manager] Support built-in field for source and sink info (#3016)
---
 .../inlong/manager/common/enums/Constant.java      |   1 +
 .../common/pojo/stream/InlongStreamInfo.java       |   7 +-
 .../thirdparty/sort/CreateSortConfigListener.java  | 168 +++++++--------------
 .../thirdparty/sort/PushSortConfigListener.java    | 105 +++++++------
 .../thirdparty/sort/ZkDisabledEventSelector.java   |   5 +-
 .../thirdparty/sort/ZkEnabledEventSelector.java    |   5 +-
 .../thirdparty/sort/util/SerializationUtils.java   |  74 +++++----
 .../thirdparty/sort/util/SinkInfoUtils.java        |  68 ++++-----
 .../thirdparty/sort/util/SourceInfoUtils.java      | 163 +++++++++++++++++---
 9 files changed, 331 insertions(+), 265 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 76e8f93..dc04265 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -23,6 +23,7 @@ package org.apache.inlong.manager.common.enums;
 public class Constant {
 
     public static final Integer UN_DELETED = 0;
+    public static final Integer IS_DELETED = 1;
 
     public static final String SOURCE_FILE = "FILE";
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 7f2724b..b82cb37 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -20,11 +20,12 @@ package org.apache.inlong.manager.common.pojo.stream;
 import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import java.util.List;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
+import java.util.Date;
+import java.util.List;
+
 /**
  * Inlong stream info
  */
@@ -52,7 +53,7 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "Data storage period, unit: day (required when dataSourceType=AUTO_PUSH)")
     private Integer storagePeriod;
 
-    @ApiModelProperty(value = "Data type, only support: TEXT")
+    @ApiModelProperty(value = "Data type, including: TEXT, KV, etc.")
     private String dataType;
 
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 0b439eb..969dd69 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -17,22 +17,17 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@@ -42,42 +37,43 @@ import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessF
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SerializationUtils;
 import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SortFieldFormatUtils;
 import org.apache.inlong.manager.service.thirdparty.sort.util.SourceInfoUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
-import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-@Slf4j
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Create sort config when disable the ZooKeeper
+ */
 @Component
 public class CreateSortConfigListener implements SortOperateListener {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(CreateSortConfigListener.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
     @Autowired
     private CommonOperateService commonOperateService;
     @Autowired
     private ClusterBean clusterBean;
     @Autowired
-    private InlongStreamService inlongStreamService;
+    private InlongStreamService streamService;
     @Autowired
     private StreamSinkService streamSinkService;
     @Autowired
@@ -90,33 +86,28 @@ public class CreateSortConfigListener implements SortOperateListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        log.info("Create sort config for context={}", context);
+        LOGGER.info("Create sort config for context={}", context);
         ProcessForm form = context.getProcessForm();
-        InlongGroupInfo groupInfo = getGroupInfo(form);
+        InlongGroupInfo groupInfo = this.getGroupInfo(form);
         String groupId = groupInfo.getInlongGroupId();
         if (StringUtils.isEmpty(groupId)) {
-            log.warn("GroupId is null for context={}", context);
+            LOGGER.warn("GroupId is null for context={}", context);
             return ListenerResult.success();
         }
-        List<StreamBriefResponse> streamBriefResponses = inlongStreamService.getBriefList(groupId);
+
+        List<StreamBriefResponse> streamBriefResponses = streamService.getBriefList(groupId);
         if (CollectionUtils.isEmpty(streamBriefResponses)) {
-            log.warn("Stream not found by groupId={}", groupId);
+            LOGGER.warn("Stream not found by groupId={}", groupId);
             return ListenerResult.success();
         }
 
-        Map<String, DataFlowInfo> dataFlowInfoMap = streamBriefResponses.stream().map(streamBriefResponse -> {
-                            DataFlowInfo flowInfo = createDataFlow(streamBriefResponse, groupInfo);
-                            if (flowInfo != null) {
-                                return Pair.of(streamBriefResponse.getInlongStreamId(), flowInfo);
-                            } else {
-                                return null;
-                            }
-                        }
-                ).filter(pair -> pair != null)
-                .collect(Collectors.toMap(pair -> pair.getKey(),
-                        pair -> pair.getValue()));
-        final ObjectMapper objectMapper = new ObjectMapper();
-        String dataFlows = objectMapper.writeValueAsString(dataFlowInfoMap);
+        Map<String, DataFlowInfo> dataFlowInfoMap = streamBriefResponses.stream().map(streamResponse -> {
+                    DataFlowInfo flowInfo = createDataFlow(streamResponse, groupInfo);
+                    return Pair.of(streamResponse.getInlongStreamId(), flowInfo);
+                }
+        ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+        String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
         InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
         extInfo.setInlongGroupId(groupId);
         extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
@@ -125,101 +116,46 @@ public class CreateSortConfigListener implements SortOperateListener {
             groupInfo.setExtList(Lists.newArrayList());
         }
         upsertDataFlow(groupInfo, extInfo);
+
         return ListenerResult.success();
     }
 
     private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
-        Iterator<InlongGroupExtInfo> inlongGroupExtInfoIterator = groupInfo.getExtList().iterator();
-        while (inlongGroupExtInfoIterator.hasNext()) {
-            InlongGroupExtInfo inlongGroupExtInfo = inlongGroupExtInfoIterator.next();
-            if (InlongGroupSettings.DATA_FLOW.equals(inlongGroupExtInfo.getKeyName())) {
-                inlongGroupExtInfoIterator.remove();
-            }
-        }
+        groupInfo.getExtList().removeIf(ext -> InlongGroupSettings.DATA_FLOW.equals(ext.getKeyName()));
         groupInfo.getExtList().add(extInfo);
     }
 
-    private DataFlowInfo createDataFlow(StreamBriefResponse streamBriefResponse,
-            InlongGroupInfo inlongGroupInfo) {
-        //TODO only support one source and one sink
-        final String groupId = streamBriefResponse.getInlongGroupId();
-        final String streamId = streamBriefResponse.getInlongStreamId();
-        final InlongStreamInfo streamInfo = inlongStreamService.get(groupId, streamId);
+    private DataFlowInfo createDataFlow(StreamBriefResponse streamResponse, InlongGroupInfo groupInfo) {
+        // TODO only support one source and one sink
+        final String groupId = streamResponse.getInlongGroupId();
+        final String streamId = streamResponse.getInlongStreamId();
+        final InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         List<SourceResponse> sourceResponses = streamSourceService.listSource(groupId, streamId);
         if (CollectionUtils.isEmpty(sourceResponses)) {
-            throw new RuntimeException(String.format("No source found by stream=%s", streamBriefResponse));
+            throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
+                    groupId, streamId));
         }
         final SourceResponse sourceResponse = sourceResponses.get(0);
-        List<SinkBriefResponse> sinkBriefResponses = streamBriefResponse.getSinkList();
+        List<SinkBriefResponse> sinkBriefResponses = streamResponse.getSinkList();
         if (CollectionUtils.isEmpty(sinkBriefResponses)) {
-            throw new RuntimeException(String.format("No sink found by stream=%s", streamBriefResponse));
+            throw new WorkflowListenerException(String.format("Sink not found by groupId=%s and streamId=%s",
+                    groupId, streamId));
         }
+
         final SinkBriefResponse sinkBriefResponse = sinkBriefResponses.get(0);
         String sinkType = sinkBriefResponse.getSinkType();
         int sinkId = sinkBriefResponse.getId();
         final SinkResponse sinkResponse = streamSinkService.get(sinkId, sinkType);
-        SourceInfo sourceInfo = createSourceInfo(inlongGroupInfo, streamInfo, sourceResponse);
-        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(streamInfo, sourceResponse, sinkResponse);
-        return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
-    }
 
-    private SourceInfo createSourceInfo(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
-            SourceResponse sourceResponse) {
-        String middleWareType = groupInfo.getMiddlewareType();
-
-        List<FieldInfo> fieldInfos = Lists.newArrayList();
-        if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
-            fieldInfos.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
-                    BuiltInField.MYSQL_METADATA_DATA));
-
-        } else {
-            if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
-                fieldInfos = streamInfo.getFieldList().stream().map(inlongStreamFieldInfo -> {
-                    FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(
-                            inlongStreamFieldInfo.getFieldType().toLowerCase());
-                    return new FieldInfo(inlongStreamFieldInfo.getFieldName(), formatInfo);
-                }).collect(Collectors.toList());
-            }
-        }
-
-        DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
-                streamInfo);
-        if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
-            return createPulsarSourceInfo(groupInfo, streamInfo, deserializationInfo, fieldInfos);
-        } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
-            return createTubeSourceInfo(groupInfo, deserializationInfo, fieldInfos);
-        } else {
-            throw new RuntimeException(
-                    String.format("MiddleWare:{} not support in CreateSortConfigListener", middleWareType));
-        }
-
-    }
-
-    private SourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo,
-            InlongStreamInfo streamInfo,
-            DeserializationInfo deserializationInfo,
-            List<FieldInfo> fieldInfos) {
-        String topicName = streamInfo.getMqResourceObj();
-        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
-        InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
-        String tenant = clusterBean.getDefaultTenant();
-        if (StringUtils.isNotEmpty(pulsarInfo.getTenant())) {
-            tenant = pulsarInfo.getTenant();
-        }
-        return SourceInfoUtils.createPulsarSourceInfo(groupInfo, topicName, deserializationInfo,
-                fieldInfos, clusterBean.getAppName(), pulsarClusterInfo, tenant);
-    }
-
-    private TubeSourceInfo createTubeSourceInfo(InlongGroupInfo groupInfo,
-            DeserializationInfo deserializationInfo,
-            List<FieldInfo> fieldInfos) {
+        // Get source info
         String masterAddress = commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL);
-        Preconditions.checkNotNull(masterAddress, "tube cluster address cannot be empty");
-        String topic = groupInfo.getMqResourceObj();
-        // The consumer group name is: taskName_topicName_consumer_group
-        String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
-        return new TubeSourceInfo(topic, masterAddress, consumerGroup,
-                deserializationInfo, fieldInfos.toArray(new FieldInfo[0]));
+        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo();
+        SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean, groupInfo,
+                streamInfo, sourceResponse, sinkResponse);
+        // Get sink info
+        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse);
+
+        return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
     }
 
     private InlongGroupInfo getGroupInfo(ProcessForm processForm) {
@@ -230,9 +166,9 @@ public class CreateSortConfigListener implements SortOperateListener {
             UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
             return updateGroupProcessForm.getGroupInfo();
         } else {
-            log.error("Illegal ProcessForm {} to get inlong group info", processForm.getFormName());
-            throw new RuntimeException(String.format("Unsupport ProcessForm {} in CreateSortConfigListener",
-                    processForm.getFormName()));
+            LOGGER.error("Illegal ProcessForm {} to get inlong group info", processForm.getFormName());
+            throw new WorkflowListenerException(
+                    String.format("Unsupported ProcessForm {%s}", processForm.getFormName()));
         }
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index deaea4a..5dc1c34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -17,16 +17,16 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -38,32 +38,33 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SortFieldFormatUtils;
 import org.apache.inlong.manager.service.thirdparty.sort.util.SourceInfoUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.sort.ZkTools;
-import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.InLongMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
-import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
 import java.util.List;
 
-@Slf4j
+/**
+ * Push sort config when enable the ZooKeeper
+ */
 @Component
 public class PushSortConfigListener implements SortOperateListener {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(PushSortConfigListener.class);
     private static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
 
     @Autowired
@@ -75,6 +76,8 @@ public class PushSortConfigListener implements SortOperateListener {
     @Autowired
     private InlongStreamService streamService;
     @Autowired
+    private StreamSourceService streamSourceService;
+    @Autowired
     private StreamSinkService streamSinkService;
     @Autowired
     private StreamSinkFieldEntityMapper streamSinkFieldMapper;
@@ -86,36 +89,36 @@ public class PushSortConfigListener implements SortOperateListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        if (log.isDebugEnabled()) {
-            log.debug("begin push hive config to sort, context={}", context);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("begin to push sort config by context={}", context);
         }
 
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
         InlongGroupInfo groupInfo = form.getGroupInfo();
         String groupId = groupInfo.getInlongGroupId();
-
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        if (groupEntity == null || EntityStatus.IS_DELETED.getCode().equals(groupEntity.getIsDeleted())) {
-            log.warn("skip to push sort hive config for groupId={}, as biz not exists or has been deleted", groupId);
+        if (groupEntity == null || Constant.IS_DELETED.equals(groupEntity.getIsDeleted())) {
+            LOGGER.warn("skip to push sort config for groupId={}, as biz not exists or has been deleted", groupId);
             return ListenerResult.success();
         }
 
         // if streamId not null, just push the config belongs to the groupId and the streamId
         String streamId = form.getInlongStreamId();
         List<SinkResponse> sinkResponses = streamSinkService.listSink(groupId, streamId);
-        for (SinkResponse sinkResponse : sinkResponses) {
-            Integer sinkId = sinkResponse.getId();
 
-            if (log.isDebugEnabled()) {
-                log.debug("sink info: {}", sinkResponse);
+        for (SinkResponse sinkResponse : sinkResponses) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("sink info: {}", sinkResponse);
             }
 
             DataFlowInfo dataFlowInfo = getDataFlowInfo(groupInfo, sinkResponse);
             // add extra properties for flow info
             dataFlowInfo.getProperties().put(DATA_FLOW_GROUP_ID_KEY, groupId);
-            if (log.isDebugEnabled()) {
-                log.debug("try to push hive config to sort: {}", JsonUtils.toJson(dataFlowInfo));
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("try to push config to sort: {}", JsonUtils.toJson(dataFlowInfo));
             }
+
+            Integer sinkId = sinkResponse.getId();
             try {
                 String zkUrl = clusterBean.getZkUrl();
                 String zkRoot = clusterBean.getZkRoot();
@@ -125,8 +128,8 @@ public class PushSortConfigListener implements SortOperateListener {
                 // add sink id to zk
                 ZkTools.addDataFlowToCluster(sortClusterName, sinkId, zkUrl, zkRoot);
             } catch (Exception e) {
-                log.error("add or update inlong stream information to zk failed, sinkId={} ", sinkId, e);
-                throw new WorkflowListenerException("push hive config to sort failed, reason: " + e.getMessage());
+                LOGGER.error("push sort config to zookeeper failed, sinkId={} ", sinkId, e);
+                throw new WorkflowListenerException("push sort config to zookeeper failed: " + e.getMessage());
             }
         }
 
@@ -139,25 +142,37 @@ public class PushSortConfigListener implements SortOperateListener {
         List<StreamSinkFieldEntity> fieldList = streamSinkFieldMapper.selectFields(groupId, streamId);
 
         if (fieldList == null || fieldList.size() == 0) {
-            throw new WorkflowListenerException("no hive fields for groupId=" + groupId + ", streamId=" + streamId);
+            throw new WorkflowListenerException(
+                    String.format("no fields for groupId=%s streamId=%s", groupId, streamId));
         }
 
-        SourceInfo sourceInfo = getSourceInfo(groupInfo, sinkResponse, fieldList);
-        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sinkResponse);
+        List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
+        if (CollectionUtils.isEmpty(sourceList)) {
+            throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
+                    groupId, streamId));
+        }
 
-        // push information
+        String masterAddress = commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL);
+        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo();
+        InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+        final SourceResponse sourceResponse = sourceList.get(0);
+        SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean, groupInfo,
+                streamInfo, sourceResponse, sinkResponse);
+
+        SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse);
         return new DataFlowInfo(sinkResponse.getId(), sourceInfo, sinkInfo);
     }
 
     /**
-     * Get source info
+     * Get source info for DataFlowInfo
      */
-    private SourceInfo getSourceInfo(InlongGroupInfo groupInfo,
-            SinkResponse hiveResponse, List<StreamSinkFieldEntity> fieldList) {
+    @Deprecated
+    private SourceInfo getSourceInfo(InlongGroupInfo groupInfo, String streamId,
+            List<StreamSinkFieldEntity> fieldList) {
         DeserializationInfo deserializationInfo = null;
         String groupId = groupInfo.getInlongGroupId();
-        String streamId = hiveResponse.getInlongStreamId();
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+
         boolean isDbType = Constant.DATA_SOURCE_DB.equals(streamInfo.getDataType());
         if (!isDbType) {
             // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use InLongMsgCsv
@@ -172,15 +187,14 @@ public class PushSortConfigListener implements SortOperateListener {
                     escape = info.getDataEscapeChar().charAt(0);
                 }*/
                 // Whether to delete the first separator, the default is false for the time being
-                deserializationInfo = new InLongMsgCsvDeserializationInfo(hiveResponse.getInlongStreamId(), separator);
+                deserializationInfo = new InLongMsgCsvDeserializationInfo(streamId, separator);
             }
         }
 
         // The number and order of the source fields must be the same as the target fields
         SourceInfo sourceInfo = null;
         // Get the source field, if there is no partition field in source, add the partition field to the end
-        List<FieldInfo> sourceFields = getSourceFields(fieldList);
-
+        // List<FieldInfo> sourceFields = getSourceFields(fieldList);
         String middleWare = groupInfo.getMiddlewareType();
         if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middleWare)) {
             String masterAddress = commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL);
@@ -188,8 +202,8 @@ public class PushSortConfigListener implements SortOperateListener {
             String topic = groupInfo.getMqResourceObj();
             // The consumer group name is: taskName_topicName_consumer_group
             String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
-            sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
-                    deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
+            // sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
+            //         deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
         } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middleWare)) {
             PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
             String tenant = clusterBean.getDefaultTenant();
@@ -197,30 +211,13 @@ public class PushSortConfigListener implements SortOperateListener {
             if (StringUtils.isNotEmpty(pulsarInfo.getTenant())) {
                 tenant = pulsarInfo.getTenant();
             }
-            sourceInfo = SourceInfoUtils.createPulsarSourceInfo(groupInfo, streamInfo.getMqResourceObj(),
-                    deserializationInfo, sourceFields, clusterBean.getAppName(),
-                    pulsarClusterInfo, tenant);
+            // sourceInfo = SourceInfoUtils.createPulsarSourceInfo(groupInfo, streamInfo.getMqResourceObj(),
+            //         deserializationInfo, sourceFields, clusterBean.getAppName(),
+            //         pulsarClusterInfo, tenant);
         }
         return sourceInfo;
     }
 
-    /**
-     * Get source field list
-     * TODO  support BuiltInField
-     */
-    private List<FieldInfo> getSourceFields(List<StreamSinkFieldEntity> fieldList) {
-        List<FieldInfo> fieldInfoList = new ArrayList<>();
-        for (StreamSinkFieldEntity field : fieldList) {
-            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
-            String fieldName = field.getSourceFieldName();
-
-            FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
-            fieldInfoList.add(fieldInfo);
-        }
-
-        return fieldInfoList;
-    }
-
     @Override
     public boolean async() {
         return false;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index ddfca00..d6c529e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -27,8 +26,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 import org.springframework.stereotype.Component;
 
+/**
+ * Event selector for whether ZooKeeper is disabled.
+ */
 @Component
-@Slf4j
 public class ZkDisabledEventSelector implements EventSelector {
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index ba80aec..a8ee6e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
@@ -26,8 +25,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 import org.springframework.stereotype.Component;
 
+/**
+ * Event selector for whether ZooKeeper is enabled.
+ */
 @Component
-@Slf4j
 public class ZkEnabledEventSelector implements EventSelector {
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index f2c77d2..55d76f3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -38,62 +38,76 @@ import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo;
 import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.springframework.util.Assert;
 
+/**
+ * Utils for Serialization and Deserialization info
+ */
 public class SerializationUtils {
 
+    /**
+     * Create deserialization info
+     */
     public static DeserializationInfo createDeserializationInfo(SourceResponse sourceResponse,
             InlongStreamInfo streamInfo) {
         SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
         switch (sourceType) {
             case BINLOG:
-                return forBinlog((BinlogSourceResponse) sourceResponse, streamInfo);
+                return deserializeForBinlog((BinlogSourceResponse) sourceResponse);
             case KAFKA:
-                return forKafka((KafkaSourceResponse) sourceResponse, streamInfo);
+                return deserializeForKafka((KafkaSourceResponse) sourceResponse, streamInfo);
             case FILE:
-                return forFile(sourceResponse, streamInfo);
+                return deserializeForFile(sourceResponse, streamInfo);
             default:
-                throw new IllegalArgumentException(String.format("Unsupport sourceType for Inlong:%s", sourceType));
+                throw new IllegalArgumentException(String.format("Unsupported sourceType: %s", sourceType));
         }
     }
 
-    public static SerializationInfo createSerializationInfo(SourceResponse sourceResponse, SinkResponse sinkResponse,
-            InlongStreamInfo inlongStreamInfo) {
+    /**
+     * Create serialization info
+     */
+    public static SerializationInfo createSerializationInfo(SourceResponse sourceResponse, SinkResponse sinkResponse) {
         SinkType sinkType = SinkType.forType(sinkResponse.getSinkType());
         switch (sinkType) {
             case HIVE:
                 return null;
             case KAFKA:
-                return forKafka(sourceResponse, (KafkaSinkResponse) sinkResponse, inlongStreamInfo);
+                return serializeForKafka(sourceResponse, (KafkaSinkResponse) sinkResponse);
             default:
-                throw new IllegalArgumentException(String.format("Unsupport sinkType for Inlong:%s", sinkType));
+                throw new IllegalArgumentException(String.format("Unsupported sinkType: %s", sinkType));
         }
     }
 
-    public static DeserializationInfo forBinlog(BinlogSourceResponse binlogSourceResponse,
-            InlongStreamInfo streamInfo) {
-        return new DebeziumDeserializationInfo(true, binlogSourceResponse.getTimestampFormatStandard());
+    /**
+     * Get serialization info for Binlog
+     */
+    private static DeserializationInfo deserializeForBinlog(BinlogSourceResponse sourceResponse) {
+        return new DebeziumDeserializationInfo(true, sourceResponse.getTimestampFormatStandard());
     }
 
-    public static DeserializationInfo forKafka(KafkaSourceResponse kafkaSourceResponse,
-            InlongStreamInfo streamInfo) {
-        String serializationType = kafkaSourceResponse.getSerializationType();
+    /**
+     * Get deserialization info for Kafka
+     */
+    private static DeserializationInfo deserializeForKafka(KafkaSourceResponse source, InlongStreamInfo stream) {
+        String serializationType = source.getSerializationType();
         DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
         switch (dataType) {
             case CSV:
-                char seperator = streamInfo.getDataSeparator().toCharArray()[0];
-                return new CsvDeserializationInfo(seperator);
+                char separator = stream.getDataSeparator().toCharArray()[0];
+                return new CsvDeserializationInfo(separator);
             case AVRO:
                 return new AvroDeserializationInfo();
             case JSON:
                 return new JsonDeserializationInfo();
             default:
                 throw new IllegalArgumentException(
-                        String.format("Unsupport serializationType for Kafka source:%s", serializationType));
+                        String.format("Unsupported serializationType for Kafka source: %s", serializationType));
         }
     }
 
-    public static SerializationInfo forKafka(SourceResponse sourceResponse, KafkaSinkResponse kafkaSinkResponse,
-            InlongStreamInfo streamInfo) {
-        String serializationType = kafkaSinkResponse.getSerializationType();
+    /**
+     * Get serialization info for Kafka
+     */
+    private static SerializationInfo serializeForKafka(SourceResponse sourceResponse, KafkaSinkResponse sinkResponse) {
+        String serializationType = sinkResponse.getSerializationType();
         DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
         switch (dataType) {
             case AVRO:
@@ -104,31 +118,33 @@ public class SerializationUtils {
                 return new CanalSerializationInfo();
             case DEBEZIUM_JSON:
                 Assert.isInstanceOf(BinlogSourceResponse.class, sourceResponse,
-                        "Unsupport serializationType for Kafka;");
-                BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
-                return new DebeziumSerializationInfo(binlogSourceResponse.getTimestampFormatStandard(),
+                        "Unsupported serializationType for Kafka");
+                BinlogSourceResponse binlogSource = (BinlogSourceResponse) sourceResponse;
+                return new DebeziumSerializationInfo(binlogSource.getTimestampFormatStandard(),
                         "FAIL", "", false);
             default:
                 throw new IllegalArgumentException(
-                        String.format("Unsupport serializationType for Kafka sink:%s", serializationType));
+                        String.format("Unsupported serializationType for Kafka sink: %s", serializationType));
         }
     }
 
-    public static DeserializationInfo forFile(SourceResponse sourceResponse,
-            InlongStreamInfo streamInfo) {
+    /**
+     * Get deserialization info for File
+     */
+    private static DeserializationInfo deserializeForFile(SourceResponse sourceResponse, InlongStreamInfo streamInfo) {
         String serializationType = sourceResponse.getSerializationType();
         DataTypeEnum dataType = DataTypeEnum.forName(serializationType);
         switch (dataType) {
             case CSV:
-                char seperator = streamInfo.getDataSeparator().toCharArray()[0];
-                return new CsvDeserializationInfo(seperator);
+                char separator = streamInfo.getDataSeparator().toCharArray()[0];
+                return new CsvDeserializationInfo(separator);
             case AVRO:
                 return new AvroDeserializationInfo();
             case JSON:
                 return new JsonDeserializationInfo();
             default:
                 throw new IllegalArgumentException(
-                        String.format("Unsupport type for File source:%s", serializationType));
+                        String.format("Unsupported type for File source:%s", serializationType));
         }
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 0583ab1..1edb0a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -28,12 +28,8 @@ import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.SerializationInfo;
 import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
@@ -67,45 +63,39 @@ public class SinkInfoUtils {
         PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
     }
 
-    public static SinkInfo createSinkInfo(SinkResponse sinkResponse) {
-        return createSinkInfo(null, null, sinkResponse);
-    }
-
+    /**
+     * Create sink info for DataFlowInfo.
+     */
     public static SinkInfo createSinkInfo(SourceResponse sourceResponse, SinkResponse sinkResponse) {
-        return createSinkInfo(null, sourceResponse, sinkResponse);
-    }
-
-    public static SinkInfo createSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
-            SinkResponse sinkResponse) {
+        boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
         String sinkType = sinkResponse.getSinkType();
         SinkInfo sinkInfo;
         if (SinkType.forType(sinkType) == SinkType.HIVE) {
-            sinkInfo = createHiveSinkInfo(sourceResponse, (HiveSinkResponse) sinkResponse);
+            sinkInfo = createHiveSinkInfo(isAllMigration, (HiveSinkResponse) sinkResponse);
         } else if (SinkType.forType(sinkType) == SinkType.KAFKA) {
-            sinkInfo = createKafkaSinkInfo(inlongStreamInfo, sourceResponse, (KafkaSinkResponse) sinkResponse);
+            sinkInfo = createKafkaSinkInfo(isAllMigration, sourceResponse, (KafkaSinkResponse) sinkResponse);
         } else if (SinkType.forType(sinkType) == SinkType.CLICKHOUSE) {
             sinkInfo = createClickhouseSinkInfo((ClickHouseSinkResponse) sinkResponse);
         } else {
-            throw new RuntimeException(
-                    String.format("SinkType:{} not support in CreateSortConfigListener", sinkType));
+            throw new RuntimeException(String.format("Unsupported SinkType {%s}", sinkType));
         }
         return sinkInfo;
     }
 
     private static ClickHouseSinkInfo createClickhouseSinkInfo(ClickHouseSinkResponse sinkResponse) {
         if (StringUtils.isEmpty(sinkResponse.getJdbcUrl())) {
-            throw new RuntimeException(String.format("clickHouseSink={} server url cannot be empty", sinkResponse));
+            throw new RuntimeException(String.format("ClickHouse={%s} server url cannot be empty", sinkResponse));
         } else if (CollectionUtils.isEmpty(sinkResponse.getFieldList())) {
-            throw new RuntimeException(String.format("clickHouseSink={} fields cannot be empty", sinkResponse));
+            throw new RuntimeException(String.format("ClickHouse={%s} fields cannot be empty", sinkResponse));
         } else if (StringUtils.isEmpty(sinkResponse.getTableName())) {
-            throw new RuntimeException(String.format("clickHouseSink={} table name cannot be empty", sinkResponse));
+            throw new RuntimeException(String.format("ClickHouse={%s} table name cannot be empty", sinkResponse));
         } else if (StringUtils.isEmpty(sinkResponse.getDatabaseName())) {
-            throw new RuntimeException(String.format("clickHouseSink={} database name cannot be empty", sinkResponse));
+            throw new RuntimeException(String.format("ClickHouse={%s} database name cannot be empty", sinkResponse));
         }
         if (sinkResponse.getDistributedTable() == null) {
-            throw new RuntimeException(
-                    String.format("clickHouseSink={} distribute is or not cannot be empty", sinkResponse));
+            throw new RuntimeException(String.format("ClickHouse={%s} distribute cannot be empty", sinkResponse));
         }
+
         ClickHouseSinkInfo.PartitionStrategy partitionStrategy;
         if (PartitionStrategy.BALANCE.name().equalsIgnoreCase(sinkResponse.getPartitionStrategy())) {
             partitionStrategy = PartitionStrategy.BALANCE;
@@ -116,6 +106,7 @@ public class SinkInfoUtils {
         } else {
             partitionStrategy = PartitionStrategy.RANDOM;
         }
+
         List<FieldInfo> fieldInfoList = getClickHouseSinkFields(sinkResponse.getFieldList());
         return new ClickHouseSinkInfo(sinkResponse.getJdbcUrl(), sinkResponse.getDatabaseName(),
                 sinkResponse.getTableName(), sinkResponse.getUsername(), sinkResponse.getPassword(),
@@ -125,28 +116,30 @@ public class SinkInfoUtils {
                 sinkResponse.getWriteMaxRetryTimes());
     }
 
-    private static KafkaSinkInfo createKafkaSinkInfo(InlongStreamInfo inlongStreamInfo, SourceResponse sourceResponse,
-            KafkaSinkResponse kafkaSinkResponse) {
+    private static KafkaSinkInfo createKafkaSinkInfo(boolean isAllMigration, SourceResponse sourceResponse,
+            KafkaSinkResponse sinkResponse) {
         List<FieldInfo> fieldInfoList = Lists.newArrayList();
-        if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
-            fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
-                    BuiltInField.MYSQL_METADATA_DATA));
+        if (isAllMigration) {
+            fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
         } else {
-            fieldInfoList = getSinkFields(kafkaSinkResponse.getFieldList(), null);
+            fieldInfoList = getSinkFields(sinkResponse.getFieldList(), null);
         }
-        String addressUrl = kafkaSinkResponse.getAddress();
-        String topicName = kafkaSinkResponse.getTopicName();
+        String addressUrl = sinkResponse.getAddress();
+        String topicName = sinkResponse.getTopicName();
         SerializationInfo serializationInfo = SerializationUtils.createSerializationInfo(sourceResponse,
-                kafkaSinkResponse, inlongStreamInfo);
+                sinkResponse);
         return new KafkaSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), addressUrl, topicName, serializationInfo);
     }
 
-    private static HiveSinkInfo createHiveSinkInfo(SourceResponse sourceResponse, HiveSinkResponse hiveInfo) {
+    /**
+     * Create Hive sink info.
+     */
+    private static HiveSinkInfo createHiveSinkInfo(boolean isAllMigration, HiveSinkResponse hiveInfo) {
         if (hiveInfo.getJdbcUrl() == null) {
-            throw new RuntimeException(String.format("hiveSink={} server url cannot be empty", hiveInfo));
+            throw new RuntimeException(String.format("HiveSink={%s} server url cannot be empty", hiveInfo));
         }
         if (CollectionUtils.isEmpty(hiveInfo.getFieldList())) {
-            throw new RuntimeException(String.format("hiveSink={} fields cannot be empty", hiveInfo));
+            throw new RuntimeException(String.format("HiveSink={%s} fields cannot be empty", hiveInfo));
         }
         // Use the field separator in Hive, the default is TextFile
         Character separator = (char) Integer.parseInt(hiveInfo.getDataSeparator());
@@ -198,9 +191,8 @@ public class SinkInfoUtils {
 
         // Get the sink field, if there is no partition field in the source field, add the partition field to the end
         List<FieldInfo> fieldInfoList = Lists.newArrayList();
-        if (SourceInfoUtils.isBinlogMigrationSource(sourceResponse)) {
-            fieldInfoList.add(new BuiltInFieldInfo("DATABASE_MIGRATION", StringFormatInfo.INSTANCE,
-                    BuiltInField.MYSQL_METADATA_DATA));
+        if (isAllMigration) {
+            fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
         } else {
             fieldInfoList = getSinkFields(hiveInfo.getFieldList(), hiveInfo.getPrimaryPartition());
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 59c0386..f850f94 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -17,25 +17,62 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort.util;
 
-import java.util.Locale;
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
 import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
-
-import java.util.List;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
 import org.apache.inlong.sort.protocol.source.TDMQPulsarSourceInfo;
+import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 
+/**
+ * Utils for source info
+ */
 public class SourceInfoUtils {
 
-    public static boolean isBinlogMigrationSource(SourceResponse sourceResponse) {
+    /**
+     * Built in field map, key is field name, value is built in field name
+     */
+    public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new HashMap<>();
+
+    static {
+        BUILT_IN_FIELD_MAP.put("data_time", BuiltInField.DATA_TIME);
+        BUILT_IN_FIELD_MAP.put("database", BuiltInField.MYSQL_METADATA_DATABASE);
+        BUILT_IN_FIELD_MAP.put("table", BuiltInField.MYSQL_METADATA_TABLE);
+        BUILT_IN_FIELD_MAP.put("event_time", BuiltInField.MYSQL_METADATA_EVENT_TIME);
+        BUILT_IN_FIELD_MAP.put("is_ddl", BuiltInField.MYSQL_METADATA_IS_DDL);
+        BUILT_IN_FIELD_MAP.put("event_type", BuiltInField.MYSQL_METADATA_EVENT_TYPE);
+    }
+
+    /**
+     * Whether the source is all binlog migration.
+     */
+    public static boolean isBinlogAllMigration(SourceResponse sourceResponse) {
         if (SourceType.BINLOG.getType().equalsIgnoreCase(sourceResponse.getSourceType())) {
             BinlogSourceResponse binlogSourceResponse = (BinlogSourceResponse) sourceResponse;
             return binlogSourceResponse.isAllMigration();
@@ -43,27 +80,111 @@ public class SourceInfoUtils {
         return false;
     }
 
-    public static SourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo,
-            String pulsarTopic,
-            DeserializationInfo deserializationInfo,
-            List<FieldInfo> fieldInfos,
-            String appName,
-            PulsarClusterInfo pulsarClusterInfo,
-            String tenant) {
+    /**
+     * Get all migration built-in field for binlog source.
+     */
+    public static BuiltInFieldInfo getAllMigrationBuiltInField() {
+        return new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
+                BuiltInField.MYSQL_METADATA_DATA);
+    }
+
+    /**
+     * Create source info for DataFlowInfo.
+     */
+    public static SourceInfo createSourceInfo(PulsarClusterInfo pulsarCluster, String masterAddress,
+            ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            SourceResponse sourceResponse, SinkResponse sinkResponse) {
+
+        List<FieldInfo> fieldInfos = Lists.newArrayList();
+        boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
+        if (isAllMigration) {
+            fieldInfos.add(SourceInfoUtils.getAllMigrationBuiltInField());
+        } else {
+            if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
+                fieldInfos = getSourceFields(sinkResponse.getFieldList());
+            }
+        }
+
+        String middleWareType = groupInfo.getMiddlewareType();
+        DeserializationInfo deserializationInfo = SerializationUtils.createDeserializationInfo(sourceResponse,
+                streamInfo);
+        SourceInfo sourceInfo;
+        if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
+            sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo,
+                    fieldInfos);
+        } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
+            // InlongGroupInfo groupInfo, String masterAddress,
+            sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, fieldInfos);
+        } else {
+            throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", middleWareType));
+        }
+
+        return sourceInfo;
+    }
+
+    /**
+     * Create source info for Pulsar
+     */
+    private static SourceInfo createPulsarSourceInfo(PulsarClusterInfo pulsarCluster, ClusterBean clusterBean,
+            InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            DeserializationInfo deserializationInfo, List<FieldInfo> fieldInfos) {
+        String topicName = streamInfo.getMqResourceObj();
+        InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
+        String tenant = clusterBean.getDefaultTenant();
+        if (StringUtils.isNotEmpty(pulsarInfo.getTenant())) {
+            tenant = pulsarInfo.getTenant();
+        }
+
         final String namespace = groupInfo.getMqResourceObj();
-        // Full name of Topic in Pulsar
-        final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
-        final String consumerGroup = appName + "_" + pulsarTopic + "_consumer_group";
-        String type = pulsarClusterInfo.getType();
+        // Full name of topic in Pulsar
+        final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + topicName;
+        final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group";
+        FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
+        String type = pulsarCluster.getType();
         if (StringUtils.isNotEmpty(type) && type.toUpperCase(Locale.ROOT).contains(Constant.MIDDLEWARE_TDMQ)) {
-            return new TDMQPulsarSourceInfo(pulsarClusterInfo.getBrokerServiceUrl(),
-                    fullTopicName, consumerGroup, pulsarClusterInfo.getToken(), deserializationInfo,
-                    fieldInfos.toArray(new FieldInfo[0]));
+            return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
+                    fullTopicName, consumerGroup, pulsarCluster.getToken(), deserializationInfo, fieldInfosArr);
         } else {
-            return new PulsarSourceInfo(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getBrokerServiceUrl(),
-                    fullTopicName, consumerGroup, deserializationInfo, fieldInfos.toArray(new FieldInfo[0]),
-                    pulsarClusterInfo.getToken());
+            return new PulsarSourceInfo(pulsarCluster.getAdminUrl(), pulsarCluster.getBrokerServiceUrl(),
+                    fullTopicName, consumerGroup, deserializationInfo, fieldInfosArr, pulsarCluster.getToken());
         }
+    }
 
+    /**
+     * Create source info TubeMQ
+     */
+    private static TubeSourceInfo createTubeSourceInfo(InlongGroupInfo groupInfo, String masterAddress,
+            ClusterBean clusterBean, DeserializationInfo deserializationInfo, List<FieldInfo> fieldInfos) {
+        Preconditions.checkNotNull(masterAddress, "tube cluster address cannot be empty");
+        String topic = groupInfo.getMqResourceObj();
+        String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+        return new TubeSourceInfo(topic, masterAddress, consumerGroup, deserializationInfo,
+                fieldInfos.toArray(new FieldInfo[0]));
     }
+
+    /**
+     * Get source field list.
+     *
+     * TODO 1. Support partition field, 2. Add is_metadata field in StreamSinkFieldEntity
+     */
+    private static List<FieldInfo> getSourceFields(List<SinkFieldResponse> fieldList) {
+        List<FieldInfo> fieldInfoList = new ArrayList<>();
+        for (SinkFieldResponse field : fieldList) {
+            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
+            String fieldName = field.getSourceFieldName();
+
+            FieldInfo fieldInfo;
+            // If the field name equals to build-in field, new a build-in field info
+            BuiltInField builtInField = SourceInfoUtils.BUILT_IN_FIELD_MAP.get(fieldName);
+            if (builtInField == null) {
+                fieldInfo = new FieldInfo(fieldName, formatInfo);
+            } else {
+                fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
+            }
+            fieldInfoList.add(fieldInfo);
+        }
+
+        return fieldInfoList;
+    }
+
 }