You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/01 08:52:25 UTC

[incubator-inlong] branch master updated: [INlONG-2791][Manager] Optimize manager client APIs (#2792)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 76b8b30  [INlONG-2791][Manager] Optimize manager client APIs (#2792)
76b8b30 is described below

commit 76b8b30eb0bc195d55680a5187af3239e73bf5dc
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Tue Mar 1 16:52:16 2022 +0800

    [INlONG-2791][Manager] Optimize manager client APIs (#2792)
    
    * [INlONG-2791][Manager] Optimize manager client APIs
    
    * [INlONG-2791][Manager] Use enums of common in manager client
    
    * [INlONG-2791][Manager] Fix sourceType and sinkType conflicts
    
    * [INlONG-2791][Manager] Add errorMsg for InlongGroup
---
 .../inlong/manager/client/api/InlongClient.java    |  10 +-
 .../inlong/manager/client/api/InlongGroup.java     |   8 +-
 .../inlong/manager/client/api/InlongGroupInfo.java |  15 +--
 .../inlong/manager/client/api/MqBaseConf.java      |   9 ++
 .../inlong/manager/client/api/SortBaseConf.java    |  26 +++-
 .../inlong/manager/client/api/StreamSink.java      |  14 +-
 .../inlong/manager/client/api/StreamSource.java    |  15 +--
 .../manager/client/api/auth/Authentication.java    |   9 ++
 .../manager/client/api/impl/BlankInlongGroup.java  |  63 +++++++++
 .../api/impl/DefaultInlongStreamBuilder.java       |   6 +-
 .../manager/client/api/impl/InlongClientImpl.java  |  30 ++++-
 .../manager/client/api/impl/InlongGroupImpl.java   |  14 +-
 .../inlong/manager/client/api/sink/HiveSink.java   |   1 +
 .../manager/client/api/source/KafkaSource.java     |   1 +
 .../client/api/source/MySQLBinlogSource.java       |   3 +-
 .../manager/client/api/source/MySQLSource.java     |   3 +-
 .../client/api/util/InlongGroupTransfer.java       | 146 ++++++++++++++++++++-
 .../api/util/InlongStreamSourceTransfer.java       |   6 +-
 .../client/api/util/InlongStreamTransfer.java      |   4 +-
 .../inlong/manager/common/enums/SourceType.java    |  23 ++--
 .../manager/common/pojo/sink/SinkPageRequest.java  |   3 +-
 .../common/pojo/source/SourcePageRequest.java      |   3 +-
 .../common/settings/InlongGroupSettings.java       |   2 +
 .../dao/mapper/StreamSourceEntityMapper.java       |   6 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   |   4 +-
 .../resources/mappers/StreamSourceEntityMapper.xml |   6 +-
 .../service/core/impl/AgentTaskServiceImpl.java    |  18 ++-
 .../service/sink/StreamSinkServiceImpl.java        |  41 +++++-
 .../service/source/StreamSourceServiceImpl.java    |  30 ++++-
 29 files changed, 421 insertions(+), 98 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index f844701..18a03ef 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -17,10 +17,8 @@
 
 package org.apache.inlong.manager.client.api;
 
-import com.github.pagehelper.PageInfo;
+import java.util.List;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 
 /**
  * An interface to manipulate Inlong Cluster
@@ -73,15 +71,15 @@ public interface InlongClient {
      * @return the list
      * @throws Exception the exception
      */
-    PageInfo<InlongGroupListResponse> listGroup(String keyword, int status, int pageNum, int pageSize) throws Exception;
+    List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) throws Exception;
 
     /**
      * Gets group.
      *
-     * @param groupId the group name
+     * @param groupName the group name
      * @return the group
      * @throws Exception the exception
      */
-    InlongGroupRequest getGroup(String groupId) throws Exception;
+    InlongGroup getGroup(String groupName) throws Exception;
 
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index ed85871..b57dd2c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -29,6 +29,13 @@ public interface InlongGroup {
     InlongStreamBuilder createStream(InlongStreamConf streamConf) throws Exception;
 
     /**
+     * Create snapshot for Inlong group
+     * @return
+     * @throws Exception
+     */
+    InlongGroupInfo snapshot() throws Exception;
+
+    /**
      * Init inlong group.
      * This operation will init all physical resources needed to start a stream group
      * Must be operated after all inlong streams were created;
@@ -64,5 +71,4 @@ public interface InlongGroup {
      * @return inlong stream contained in this group
      */
     List<InlongStream> listStreams() throws Exception;
-
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java
index 0ab09e7..883f0f5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java
@@ -17,16 +17,14 @@
 
 package org.apache.inlong.manager.client.api;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.Map;
 import lombok.Data;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
 @Data
 public class InlongGroupInfo implements Serializable {
 
@@ -38,7 +36,8 @@ public class InlongGroupInfo implements Serializable {
 
     private Map<String, InlongStream> inlongStreamMap;
 
-    private List<String> errMsg;
+    //k->taskName v->errorMsg
+    private Map<String, String> errMsg;
 
     private InlongGroupState state;
 
@@ -49,14 +48,14 @@ public class InlongGroupInfo implements Serializable {
         this.groupName = groupInfo.getName();
         this.groupConf = streamGroupConf;
         this.inlongStreamMap = groupContext.getStreamMap();
-        this.errMsg = Lists.newArrayList();
+        this.errMsg = Maps.newHashMap();
         this.state = InlongGroupState.parseByBizStatus(groupInfo.getStatus());
     }
 
     public enum InlongGroupState {
         INIT, FAIL, START, SUSPEND, RESTART, DELETE;
 
-        // Reference to  org.apache.inlong.manager.common.enums.EntityStatus code
+        // Reference to  rg.apache.inlong.manager.common.enums.GroupState code
         public static InlongGroupState parseByBizStatus(int bizCode) {
 
             switch (bizCode) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
index 8a500b3..73c0e2b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
@@ -37,6 +37,15 @@ public abstract class MqBaseConf implements Serializable {
         PULSAR,
         TUBE,
         NONE;
+
+        public static MqType forType(String type) {
+            for (MqType mqType : values()) {
+                if (mqType.name().equals(type)) {
+                    return mqType;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Unsupport queue=%s for Inlong", type));
+        }
     }
 
     @ApiModelProperty("The number of partitions of Topic, 1-20")
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
index b670f5a..608870d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
@@ -25,9 +25,29 @@ import lombok.Data;
 public abstract class SortBaseConf {
 
     public enum SortType {
-        FLINK,
-        LOCAL,
-        USER_DEFINED;
+        FLINK("flink"),
+        LOCAL("local"),
+        USER_DEFINED("user_defined");
+
+        private String type;
+
+        public String getType() {
+            return this.type;
+        }
+
+        SortType(String type) {
+            this.type = type;
+        }
+
+        public static SortType forType(String type) {
+            for (SortType sortType : values()) {
+                if (sortType.getType().equals(type)) {
+                    return sortType;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Unsupport type=%s for Inlong", type));
+        }
+
     }
 
     public abstract SortType getType();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 689ed7c..fd25024 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -20,24 +20,12 @@ package org.apache.inlong.manager.client.api;
 import io.swagger.annotations.ApiModel;
 import java.util.List;
 import lombok.Data;
+import org.apache.inlong.manager.common.enums.SinkType;
 
 @Data
 @ApiModel("Stream sink configuration")
 public abstract class StreamSink {
 
-    public enum SinkType {
-        HIVE, ES, KAFKA;
-
-        public static SinkType forType(String type) {
-            for (SinkType sinkType : values()) {
-                if (sinkType.name().equals(type)) {
-                    return sinkType;
-                }
-            }
-            throw new IllegalArgumentException(String.format("Illegal sink type=%s for Inlong", type));
-        }
-    }
-
     public abstract SinkType getSinkType();
 
     public abstract List<StreamField> getStreamFields();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index b588e66..80f09a9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -19,25 +19,12 @@ package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
+import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
 @ApiModel("Stream source configuration")
 public abstract class StreamSource {
 
-    public enum SourceType {
-        FILE, KAFKA, DB, BINLOG;
-
-        public static SourceType forType(String type) {
-            for (SourceType sourceType : values()) {
-                if (sourceType.name().equals(type)) {
-                    return sourceType;
-                }
-            }
-            throw new IllegalArgumentException(
-                    String.format("Unsupport source type=%s for Inlong", type));
-        }
-    }
-
     public enum SyncType {
         FULL, INCREMENT
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java
index 71433fe..443237a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java
@@ -32,6 +32,15 @@ public interface Authentication {
         public String toString() {
             return this.name().toLowerCase(Locale.ROOT);
         }
+
+        public static AuthType forType(String type) {
+            for (AuthType authType : values()) {
+                if (authType.name().equals(type.toUpperCase())) {
+                    return authType;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Unsupported authType=%s for Inlong", type));
+        }
     }
 
     AuthType getAuthType();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
new file mode 100644
index 0000000..5c0c9b6
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import java.util.List;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupInfo;
+import org.apache.inlong.manager.client.api.InlongStream;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.client.api.InlongStreamConf;
+
+public class BlankInlongGroup implements InlongGroup {
+
+    @Override
+    public InlongStreamBuilder createStream(InlongStreamConf streamConf) throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
+    public InlongGroupInfo snapshot() throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
+    public InlongGroupInfo init() throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
+    public InlongGroupInfo suspend() throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
+    public InlongGroupInfo restart() throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
+    public InlongGroupInfo delete() throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+
+    @Override
+    public List<InlongStream> listStreams() throws Exception {
+        throw new UnsupportedOperationException("Inlong group is not exists");
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index bcd1819..e491c44 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -27,15 +27,15 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamSink;
-import org.apache.inlong.manager.client.api.StreamSink.SinkType;
 import org.apache.inlong.manager.client.api.StreamSource;
-import org.apache.inlong.manager.client.api.StreamSource.SourceType;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
 import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
 import org.apache.inlong.manager.client.api.util.GsonUtil;
 import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@@ -137,7 +137,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
 
     private int initOrUpdateSource(SourceRequest sourceRequest) {
         String sourceType = sourceRequest.getSourceType();
-        if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
+        if (SourceType.KAFKA.name().equals(sourceType) || SourceType.DB_BINLOG.name().equals(sourceType)) {
             List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
                     sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
             if (CollectionUtils.isEmpty(responses)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 59f1f62..75a619b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -19,18 +19,23 @@ package org.apache.inlong.manager.client.api.impl;
 
 import com.github.pagehelper.PageInfo;
 import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
 import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 
@@ -74,16 +79,33 @@ public class InlongClientImpl implements InlongClient {
     }
 
     @Override
-    public PageInfo<InlongGroupListResponse> listGroup(String keyword, int status,
+    public List<InlongGroup> listGroup(String expr, int status,
             int pageNum, int pageSize) throws Exception {
         InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
-        return managerClient.listGroupInfo(keyword, status, pageNum, pageSize);
+        PageInfo<InlongGroupListResponse> responsePageInfo = managerClient.listGroupInfo(expr, status, pageNum,
+                pageSize);
+        if (CollectionUtils.isEmpty(responsePageInfo.getList())) {
+            return Lists.newArrayList();
+        } else {
+            return responsePageInfo.getList().stream().map(response -> {
+                String groupId = response.getInlongGroupId();
+                InlongGroupRequest request = managerClient.getGroupInfo(groupId);
+                InlongGroupConf groupConf = InlongGroupTransfer.parseGroupRequest(request);
+                return new InlongGroupImpl(groupConf, this);
+            }).collect(Collectors.toList());
+        }
     }
 
     @Override
-    public InlongGroupRequest getGroup(String groupId) throws Exception {
+    public InlongGroup getGroup(String groupName) throws Exception {
         InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
-        return managerClient.getGroupInfo(groupId);
+        final String groupId = "b_" + groupName;
+        InlongGroupRequest groupRequest = managerClient.getGroupInfo(groupId);
+        if (groupRequest == null) {
+            return new BlankInlongGroup();
+        }
+        InlongGroupConf groupConf = InlongGroupTransfer.parseGroupRequest(groupRequest);
+        return new InlongGroupImpl(groupConf, this);
     }
 
     private boolean checkConnectivity(String host, int port, int connectTimeout) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index dba26be..0c4624d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -43,6 +44,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -79,6 +81,11 @@ public class InlongGroupImpl implements InlongGroup {
     }
 
     @Override
+    public InlongGroupInfo snapshot() throws Exception {
+        return generateSnapshot(groupContext.getGroupRequest());
+    }
+
+    @Override
     public InlongGroupInfo init() throws Exception {
         WorkflowResult initWorkflowResult = managerClient.initInlongGroup(this.groupContext.getGroupRequest());
         List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
@@ -147,7 +154,12 @@ public class InlongGroupImpl implements InlongGroup {
         String inlongGroupId = currentBizInfo.getInlongGroupId();
         List<InlongStream> dataStreams = fetchDataStreams(inlongGroupId);
         dataStreams.stream().forEach(dataStream -> groupContext.setStream(dataStream));
-        return new InlongGroupInfo(groupContext, groupConf);
+        InlongGroupInfo groupInfo = new InlongGroupInfo(groupContext, groupConf);
+        List<EventLogView> logViews = managerClient.getInlongGroupError(inlongGroupId);
+        Map<String, String> errMsgs = logViews.stream().collect(
+                Collectors.toMap(EventLogView::getEvent, EventLogView::getException));
+        groupInfo.setErrMsg(errMsgs);
+        return groupInfo;
     }
 
     private List<InlongStream> fetchDataStreams(String groupId) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index df1667c..f93adfb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.client.api.DataSeparator;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.SinkType;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index f447f9d..45b3249 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -24,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
 @AllArgsConstructor
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index be68273..0b3743a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -25,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
 @AllArgsConstructor
@@ -33,7 +34,7 @@ import org.apache.inlong.manager.client.api.StreamSource;
 public class MySQLBinlogSource extends StreamSource {
 
     @ApiModelProperty(value = "DataSource type", required = true)
-    private SourceType sourceType = SourceType.BINLOG;
+    private SourceType sourceType = SourceType.DB_BINLOG;
 
     @ApiModelProperty("SyncType for MySQL")
     private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index 7395293..2846154 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -24,6 +24,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
 @AllArgsConstructor
@@ -32,7 +33,7 @@ import org.apache.inlong.manager.client.api.StreamSource;
 public class MySQLSource extends StreamSource {
 
     @ApiModelProperty(value = "DataSource type", required = true)
-    private SourceType sourceType = SourceType.DB;
+    private SourceType sourceType = SourceType.DB_SQL;
 
     @ApiModelProperty("SyncType for MySQL")
     private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index b614030..2e8a29c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -18,6 +18,11 @@
 package org.apache.inlong.manager.client.api.util;
 
 import com.google.common.collect.Lists;
+import com.google.gson.reflect.TypeToken;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -34,16 +39,143 @@ import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
 import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.client.api.auth.TokenAuthentication;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupMqExtBase;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class InlongGroupTransfer {
 
+    public static InlongGroupConf parseGroupRequest(InlongGroupRequest groupRequest) {
+        InlongGroupConf inlongGroupConf = new InlongGroupConf();
+        inlongGroupConf.setGroupName(groupRequest.getName());
+        inlongGroupConf.setDescription(groupRequest.getDescription());
+        inlongGroupConf.setCnName(groupRequest.getCnName());
+        inlongGroupConf.setZookeeperEnabled(groupRequest.getZookeeperEnabled() == 1);
+        inlongGroupConf.setDailyRecords(Long.valueOf(groupRequest.getDailyRecords()));
+        inlongGroupConf.setPeakRecords(Long.valueOf(groupRequest.getPeakRecords()));
+        inlongGroupConf.setMqBaseConf(parseMqBaseConf(groupRequest));
+        inlongGroupConf.setSortBaseConf(parseSortBaseConf(groupRequest));
+        return inlongGroupConf;
+    }
+
+    public static MqBaseConf parseMqBaseConf(InlongGroupRequest inlongGroupRequest) {
+        InlongGroupMqExtBase mqExtBase = inlongGroupRequest.getMqExtInfo();
+        String middleWare = mqExtBase.getMiddlewareType();
+        MqType mqType = MqType.forType(middleWare);
+        switch (mqType) {
+            case NONE:
+                return MqBaseConf.BLANK_MQ_CONF;
+            case PULSAR:
+                return parsePulsarConf(inlongGroupRequest);
+            case TUBE:
+                return parseTubeConf(inlongGroupRequest);
+            default:
+                throw new RuntimeException(String.format("Illegal mqType=%s for Inlong", mqType));
+        }
+    }
+
+    public static SortBaseConf parseSortBaseConf(InlongGroupRequest groupRequest) {
+        List<InlongGroupExtInfo> groupExtInfos = groupRequest.getExtList();
+        if (CollectionUtils.isEmpty(groupExtInfos)) {
+            return null;
+        }
+        String type = null;
+        for (InlongGroupExtInfo extInfo : groupExtInfos) {
+            if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_TYPE)) {
+                type = extInfo.getKeyValue();
+                break;
+            }
+        }
+        if (type == null) {
+            return null;
+        }
+        SortType sortType = SortType.forType(type);
+        switch (sortType) {
+            case FLINK:
+                return parseFlinkSortConf(groupExtInfos);
+            case USER_DEFINED:
+                return parseUdf(groupExtInfos);
+            default:
+                throw new IllegalArgumentException(String.format("Unsupport sort type=%s for Inlong", sortType));
+        }
+    }
+
+    private static FlinkSortBaseConf parseFlinkSortConf(List<InlongGroupExtInfo> groupExtInfos) {
+        FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
+        for (InlongGroupExtInfo extInfo : groupExtInfos) {
+            if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_URL)) {
+                sortBaseConf.setServiceUrl(extInfo.getKeyValue());
+            }
+            if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_PROPERTIES)) {
+                Map<String, String> properties = GsonUtil.fromJson(extInfo.getKeyValue(),
+                        new TypeToken<Map<String, String>>() {
+                        }.getType());
+                sortBaseConf.setProperties(properties);
+            }
+        }
+        return sortBaseConf;
+    }
+
+    private static UserDefinedSortConf parseUdf(List<InlongGroupExtInfo> groupExtInfos) {
+        UserDefinedSortConf sortConf = new UserDefinedSortConf();
+        for (InlongGroupExtInfo extInfo : groupExtInfos) {
+            if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_NAME)) {
+                sortConf.setSortName(extInfo.getKeyValue());
+            }
+            if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_PROPERTIES)) {
+                Map<String, String> properties = GsonUtil.fromJson(extInfo.getKeyValue(),
+                        new TypeToken<Map<String, String>>() {
+                        }.getType());
+                sortConf.setProperties(properties);
+            }
+        }
+        return sortConf;
+    }
+
+    private static PulsarBaseConf parsePulsarConf(InlongGroupRequest groupRequest) {
+        PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
+        pulsarBaseConf.setNamespace(groupRequest.getMqResourceObj());
+        InlongGroupPulsarInfo inlongGroupPulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo();
+        pulsarBaseConf.setAckQuorum(inlongGroupPulsarInfo.getAckQuorum());
+        pulsarBaseConf.setWriteQuorum(inlongGroupPulsarInfo.getWriteQuorum());
+        pulsarBaseConf.setEnsemble(inlongGroupPulsarInfo.getEnsemble());
+        pulsarBaseConf.setTtl(inlongGroupPulsarInfo.getTtl());
+        pulsarBaseConf.setRetentionTime(inlongGroupPulsarInfo.getRetentionTime());
+        pulsarBaseConf.setRetentionSize(inlongGroupPulsarInfo.getRetentionSize());
+        pulsarBaseConf.setRetentionSizeUnit(inlongGroupPulsarInfo.getRetentionSizeUnit());
+        pulsarBaseConf.setRetentionTimeUnit(inlongGroupPulsarInfo.getRetentionTimeUnit());
+        List<InlongGroupExtInfo> groupExtInfos = groupRequest.getExtList();
+        for (InlongGroupExtInfo extInfo : groupExtInfos) {
+            if (extInfo.getKeyName().equals(InlongGroupSettings.PULSAR_ADMIN_URL)) {
+                pulsarBaseConf.setPulsarAdminUrl(extInfo.getKeyValue());
+            }
+            if (extInfo.getKeyName().equals(InlongGroupSettings.PULSAR_SERVICE_URL)) {
+                pulsarBaseConf.setPulsarServiceUrl(extInfo.getKeyValue());
+            }
+        }
+        return pulsarBaseConf;
+    }
+
+    private static TubeBaseConf parseTubeConf(InlongGroupRequest groupRequest) {
+        TubeBaseConf tubeBaseConf = new TubeBaseConf();
+        tubeBaseConf.setGroupName(groupRequest.getMqResourceObj());
+        List<InlongGroupExtInfo> groupExtInfos = groupRequest.getExtList();
+        for (InlongGroupExtInfo extInfo : groupExtInfos) {
+            if (extInfo.getKeyName().equals(InlongGroupSettings.TUBE_CLUSTER_ID)) {
+                tubeBaseConf.setTubeClusterId(Integer.parseInt(extInfo.getKeyValue()));
+            }
+            if (extInfo.getKeyName().equals(InlongGroupSettings.TUBE_MANAGER_URL)) {
+                tubeBaseConf.setTubeManagerUrl(extInfo.getKeyValue());
+            }
+            if (extInfo.getKeyName().equals(InlongGroupSettings.TUBE_MASTER_URL)) {
+                tubeBaseConf.setTubeMasterUrl(extInfo.getKeyValue());
+            }
+        }
+        return tubeBaseConf;
+    }
+
     public static InlongGroupRequest createGroupInfo(InlongGroupConf groupConf) {
         InlongGroupRequest groupInfo = new InlongGroupRequest();
         AssertUtil.hasLength(groupConf.getGroupName(), "GroupName should not be empty");
@@ -165,7 +297,7 @@ public class InlongGroupTransfer {
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();
         InlongGroupExtInfo sortType = new InlongGroupExtInfo();
         sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
-        sortType.setKeyValue(InlongGroupSettings.DEFAULT_SORT_TYPE);
+        sortType.setKeyValue(SortType.FLINK.getType());
         extInfos.add(sortType);
         if (flinkSortBaseConf.getAuthentication() != null) {
             Authentication authentication = flinkSortBaseConf.getAuthentication();
@@ -201,8 +333,12 @@ public class InlongGroupTransfer {
         List<InlongGroupExtInfo> extInfos = new ArrayList<>();
         InlongGroupExtInfo sortType = new InlongGroupExtInfo();
         sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
-        sortType.setKeyValue(userDefinedSortConf.getSortName());
+        sortType.setKeyValue(SortType.USER_DEFINED.getType());
         extInfos.add(sortType);
+        InlongGroupExtInfo sortName = new InlongGroupExtInfo();
+        sortName.setKeyName(InlongGroupSettings.SORT_NAME);
+        sortName.setKeyValue(userDefinedSortConf.getSortName());
+        extInfos.add(sortName);
         if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
             InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
             flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 273464c..0aacb72 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -22,10 +22,10 @@ import com.google.common.base.Splitter;
 import java.util.List;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
-import org.apache.inlong.manager.client.api.StreamSource.SourceType;
 import org.apache.inlong.manager.client.api.StreamSource.SyncType;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
@@ -41,7 +41,7 @@ public class InlongStreamSourceTransfer {
         switch (sourceType) {
             case KAFKA:
                 return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
-            case BINLOG:
+            case DB_BINLOG:
                 return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
             default:
                 throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
@@ -51,7 +51,7 @@ public class InlongStreamSourceTransfer {
     public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
         String type = sourceListResponse.getSourceType();
         SourceType sourceType = SourceType.forType(type);
-        if (sourceType == SourceType.BINLOG) {
+        if (sourceType == SourceType.DB_BINLOG) {
             return parseKafkaSource((KafkaSourceListResponse) sourceListResponse);
         } else if (sourceType == SourceType.KAFKA) {
             return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index e3f4234..d8924c0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.client.api.util;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.StreamField;
 import org.apache.inlong.manager.client.api.StreamField.FieldType;
 import org.apache.inlong.manager.client.api.StreamSink;
-import org.apache.inlong.manager.client.api.StreamSink.SinkType;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
+import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 1b4f0b4..5468154 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -18,18 +18,25 @@
 package org.apache.inlong.manager.common.enums;
 
 import java.util.Locale;
+import lombok.Getter;
+import org.apache.inlong.common.enums.TaskTypeEnum;
 
 public enum SourceType {
 
-    FILE("FILE"),
-    DB_SQL("DB_SQL"),
-    DB_BINLOG("DB_BINLOG"),
-    KAFKA("KAFKA");
+    FILE("FILE", TaskTypeEnum.FILE),
+    DB_SQL("DB_SQL", TaskTypeEnum.SQL),
+    DB_BINLOG("DB_BINLOG", TaskTypeEnum.BINLOG),
+    KAFKA("KAFKA", TaskTypeEnum.KAFKA);
 
+    @Getter
     private final String type;
 
-    SourceType(String type) {
+    @Getter
+    private final TaskTypeEnum taskType;
+
+    SourceType(String type, TaskTypeEnum taskType) {
         this.type = type;
+        this.taskType = taskType;
     }
 
     /**
@@ -37,17 +44,13 @@ public enum SourceType {
      */
     public static SourceType forType(String sourceType) {
         for (SourceType type : values()) {
-            if (type.name().equals(sourceType)) {
+            if (type.getType().equals(sourceType)) {
                 return type;
             }
         }
         throw new IllegalArgumentException(String.format("Illegal sink type for %s", sourceType));
     }
 
-    public String forType() {
-        return this.type;
-    }
-
     @Override
     public String toString() {
         return this.name().toUpperCase(Locale.ROOT);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
index e0895d9..ac368e4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
@@ -40,8 +40,7 @@ public class SinkPageRequest extends PageRequest {
     @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
 
-    @ApiModelProperty(value = "Sink type, such as HIVE", required = true)
-    @NotNull
+    @ApiModelProperty(value = "Sink type, such as HIVE")
     private String sinkType;
 
     @ApiModelProperty(value = "Key word")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java
index 5d8a5f5..6db1b42 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java
@@ -40,8 +40,7 @@ public class SourcePageRequest extends PageRequest {
     @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
 
-    @NotNull
-    @ApiModelProperty(value = "Source type, such as FILE", required = true)
+    @ApiModelProperty(value = "Source type, such as FILE")
     private String sourceType;
 
     @ApiModelProperty(value = "Key word")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index dd51641..9e0d988 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -76,6 +76,8 @@ public class InlongGroupSettings {
 
     public static String DEFAULT_SORT_TYPE = "flink";
 
+    public static String SORT_NAME = "sort.name";
+
     public static String SORT_URL = "sort.url";
 
     public static String SORT_AUTHENTICATION = "sort.authentication";
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index eff8215..0631499 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -17,15 +17,13 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
+import java.util.List;
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.springframework.stereotype.Repository;
 
-import java.util.List;
-
 @Repository
 public interface StreamSourceEntityMapper {
 
@@ -87,6 +85,6 @@ public interface StreamSourceEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
-    List<DataConfig> selectAgentTaskDataConfig(TaskRequest taskRequest);
+    List<StreamSourceEntity> selectAgentTaskDataConfig(TaskRequest taskRequest);
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 3dc5433..a40f669 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -71,7 +71,9 @@
         from stream_sink
         <where>
             is_deleted = 0
-            and sink_type = #{request.sinkType, jdbcType=VARCHAR}
+            <if test="request.sinkType != null and request.sinkType != ''">
+                and sink_type = #{request.sinkType, jdbcType=VARCHAR}
+            </if>
             and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
             <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
                 and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c7cbd26..317d3f9 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -232,7 +232,9 @@
         from stream_source
         <where>
             is_deleted = 0
-            and source_type = #{request.sourceType, jdbcType=VARCHAR}
+            <if test="request.sourceType != null and request.sourceType != ''">
+                and source_type = #{request.sourceType, jdbcType=VARCHAR}
+            </if>
             and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
             <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
                 and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
@@ -404,7 +406,7 @@
         detail.modify_time,
         detail.uuid,
         detail.agent_ip,
-        detail.status mod 100 as op,
+        detail.status,
         detail.id,
         detail.source_type,
         detail.snapshot,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
index 7746edc..02740a3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
@@ -33,6 +33,7 @@ import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.FileAgentDataGenerateRule;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
 import org.apache.inlong.manager.common.pojo.agent.CheckAgentTaskConfRequest;
 import org.apache.inlong.manager.common.pojo.agent.ConfirmAgentIpRequest;
@@ -44,6 +45,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
 import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
 import org.apache.inlong.manager.dao.entity.SourceFileDetailEntity;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
@@ -89,7 +91,21 @@ public class AgentTaskServiceImpl implements AgentTaskService {
     }
 
     private List<DataConfig> getAgentDataConfigs(TaskRequest taskRequest) {
-        List<DataConfig> dataConfigs = streamSourceMapper.selectAgentTaskDataConfig(taskRequest);
+        List<StreamSourceEntity> sourceEntities = streamSourceMapper.selectAgentTaskDataConfig(taskRequest);
+        List<DataConfig> dataConfigs = sourceEntities.stream().map(sourceEntity -> {
+            DataConfig dataConfig = new DataConfig();
+            dataConfig.setOp(String.valueOf(sourceEntity.getStatus() % 100));
+            dataConfig.setJobId(sourceEntity.getId());
+            SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+            dataConfig.setTaskType(sourceType.getTaskType().getType());
+            dataConfig.setInlongGroupId(sourceEntity.getInlongGroupId());
+            dataConfig.setInlongStreamId(sourceEntity.getInlongStreamId());
+            dataConfig.setIp(sourceEntity.getAgentIp());
+            dataConfig.setUuid(sourceEntity.getUuid());
+            dataConfig.setExtParams(sourceEntity.getExtParams());
+            dataConfig.setSnapshot(sourceEntity.getSnapshot());
+            return dataConfig;
+        }).collect(Collectors.toList());
         //Forward Compatible File task type
         return dataConfigs;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 5f8f2bf..b345460 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sink;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -34,6 +35,10 @@ import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -171,16 +176,42 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             LOGGER.debug("begin to list sink page by " + request);
         }
         Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
-        String sinkType = request.getSinkType();
-        Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
 
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>) sinkMapper.selectByCondition(request);
+        List<SinkListResponse> sinkListResponses = Lists.newArrayList();
+        for (StreamSinkEntity entity : entityPage) {
+            SinkType sinkType = SinkType.forType(entity.getSinkType());
+            StreamSinkOperation operation = operationFactory.getInstance(sinkType);
+            switch (sinkType) {
+                case HIVE:
+                    HiveSinkListResponse hiveSinkListResponse = operation.getFromEntity(entity,
+                            HiveSinkListResponse::new);
+                    sinkListResponses.add(hiveSinkListResponse);
+                    break;
+                case CLICKHOUSE:
+                    ClickHouseSinkListResponse clickHouseSinkListResponse = operation.getFromEntity(entity,
+                            ClickHouseSinkListResponse::new);
+                    sinkListResponses.add(clickHouseSinkListResponse);
+                    break;
+                case ICEBERG:
+                    IcebergSinkListResponse icebergSinkListResponse = operation.getFromEntity(entity,
+                            IcebergSinkListResponse::new);
+                    sinkListResponses.add(icebergSinkListResponse);
+                    break;
+                case KAFKA:
+                    KafkaSinkListResponse kafkaSinkListResponse = operation.getFromEntity(entity,
+                            KafkaSinkListResponse::new);
+                    sinkListResponses.add(kafkaSinkListResponse);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unsupported sinkType=%s for Inlong", sinkType));
+            }
 
+        }
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
-        StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
-        PageInfo<? extends SinkListResponse> pageInfo = operation.getPageInfo(entityPage);
-        pageInfo.setTotal(entityPage.getTotal());
+        PageInfo<? extends SinkListResponse> pageInfo = PageInfo.of(sinkListResponses);
 
         LOGGER.debug("success to list sink page");
         return pageInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 4979f18..07b7721 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -35,6 +36,8 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
@@ -127,17 +130,32 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             LOGGER.debug("begin to list source page by " + request);
         }
         Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
-        String sourceType = request.getSourceType();
-        Preconditions.checkNotNull(sourceType, Constant.SOURCE_TYPE_IS_EMPTY);
 
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<StreamSourceEntity> entityPage = (Page<StreamSourceEntity>) sourceMapper.selectByCondition(request);
 
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
-        StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
-        PageInfo<? extends SourceListResponse> pageInfo = operation.getPageInfo(entityPage);
-        pageInfo.setTotal(entityPage.getTotal());
-
+        List<SourceListResponse> responses = Lists.newArrayList();
+        for (StreamSourceEntity entity : entityPage) {
+            SourceType sourceType = SourceType.forType(entity.getSourceType());
+            StreamSourceOperation operation = operationFactory.getInstance(sourceType);
+            switch (sourceType) {
+                case DB_BINLOG:
+                    BinlogSourceListResponse binlogSourceListResponse = operation.getFromEntity(entity,
+                            BinlogSourceListResponse::new);
+                    responses.add(binlogSourceListResponse);
+                    break;
+                case KAFKA:
+                    KafkaSourceListResponse kafkaSourceListResponse = operation.getFromEntity(entity,
+                            KafkaSourceListResponse::new);
+                    responses.add(kafkaSourceListResponse);
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unsupported sourceType=%s for Inlong", sourceType));
+            }
+        }
+        PageInfo<? extends SourceListResponse> pageInfo = PageInfo.of(responses);
         LOGGER.debug("success to list source page");
         return pageInfo;
     }