You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/24 07:59:53 UTC

[incubator-inlong] branch master updated: [INLONG-4299][Manager] Add InlongStreamExtensionInfo (#4330)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b08eebb [INLONG-4299][Manager] Add InlongStreamExtensionInfo (#4330)
77b08eebb is described below

commit 77b08eebb607d4243b7fde5bb46d02ef81efea06
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Tue May 24 15:59:48 2022 +0800

    [INLONG-4299][Manager] Add InlongStreamExtensionInfo (#4330)
    
    * Add InlongStreamExtensionInfo
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../common/pojo/stream/InlongStreamExtInfo.java}   |  41 ++---
 .../common/pojo/stream/InlongStreamInfo.java       |   3 +
 .../common/pojo/stream/InlongStreamRequest.java    |   2 +
 .../common/pojo/stream/InlongStreamResponse.java   |   3 +
 .../manager/dao/entity/InlongStreamExtEntity.java} |  36 ++--
 .../dao/mapper/InlongStreamExtEntityMapper.java    |  72 ++++++++
 .../src/main/resources/generatorConfig.xml         |   2 +-
 .../mappers/InlongStreamExtEntityMapper.xml        | 203 +++++++++++++++++++++
 .../manager/plugin/FlinkSortProcessPlugin.java     |  12 ++
 .../plugin/eventselect/DeleteProcessSelector.java  |   4 +-
 ...cessSelector.java => DeleteStreamSelector.java} |  20 +-
 .../plugin/eventselect/RestartProcessSelector.java |   8 +-
 ...essSelector.java => RestartStreamSelector.java} |  20 +-
 .../plugin/eventselect/StartupProcessSelector.java |  10 +-
 ...essSelector.java => StartupStreamSelector.java} |  22 +--
 .../plugin/eventselect/SuspendProcessSelector.java |   4 +-
 ...essSelector.java => SuspendStreamSelector.java} |  22 +--
 .../inlong/manager/plugin/flink/FlinkService.java  |  16 +-
 .../inlong/manager/plugin/flink/dto/FlinkInfo.java |   2 -
 ...SortListener.java => DeleteStreamListener.java} |  75 +++-----
 .../plugin/listener/RestartSortListener.java       |  20 +-
 ...ortListener.java => RestartStreamListener.java} |  65 +++----
 .../plugin/listener/StartupSortListener.java       |  32 +---
 ...ortListener.java => StartupStreamListener.java} |  84 +++------
 ...ortListener.java => SuspendStreamListener.java} |  75 +++-----
 .../service/core/impl/InlongStreamServiceImpl.java |  87 ++++++---
 .../service/group/InlongGroupServiceImpl.java      |  11 +-
 .../service/sort/CreateSortConfigListenerV2.java   |   9 +-
 .../sort/CreateStreamSortConfigListener.java       | 108 ++++++++---
 .../main/resources/sql/apache_inlong_manager.sql   |  17 ++
 .../manager-web/sql/apache_inlong_manager.sql      |  18 ++
 31 files changed, 702 insertions(+), 401 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamExtInfo.java
similarity index 55%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamExtInfo.java
index b3af2b3d0..fa3893638 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamExtInfo.java
@@ -15,40 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.plugin.flink.dto;
+package org.apache.inlong.manager.common.pojo.stream;
 
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-
-import java.util.List;
 
 /**
- * Flink infomation, including end point, job name, source type, etc.
+ * Inlong stream extension information
  */
 @Data
-public class FlinkInfo {
-
-    private String endpoint;
-
-    private String jobName;
-
-    private List<InlongStreamInfo> inlongStreamInfoList;
-
-    private String localJarPath;
-
-    private String localConfPath;
-
-    private String sourceType;
-
-    private String sinkType;
-
-    private String jobId;
+@ApiModel("Inlong stream extension information")
+public class InlongStreamExtInfo {
 
-    private String savepointPath;
+    @ApiModelProperty(value = "id")
+    private Integer id;
 
-    private boolean isException = false;
+    @ApiModelProperty(value = "inlong group id", required = true)
+    private String inlongGroupId;
 
-    private String exceptionMsg;
+    @ApiModelProperty(value = "inlong stream id", required = true)
+    private String inlongStreamId;
 
+    @ApiModelProperty(value = "property name")
+    private String keyName;
 
+    @ApiModelProperty(value = "property value")
+    private String keyValue;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 380b4c246..9826f8d52 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -109,6 +109,9 @@ public class InlongStreamInfo {
     @ApiModelProperty(value = "Field list")
     private List<InlongStreamFieldInfo> fieldList;
 
+    @ApiModelProperty(value = "Inlong stream Extension properties")
+    private List<InlongStreamExtInfo> extList;
+
     public InlongStreamResponse genResponse() {
         return CommonBeanUtils.copyProperties(this, InlongStreamResponse::new);
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index 4fe9f923d..a972f74db 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -87,4 +87,6 @@ public class InlongStreamRequest {
     @ApiModelProperty(value = "Field list")
     private List<InlongStreamFieldInfo> fieldList;
 
+    @ApiModelProperty(value = "Inlong stream Extension properties")
+    private List<InlongStreamExtInfo> extList;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
index 9dc694fa7..fc875b98a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamResponse.java
@@ -115,4 +115,7 @@ public class InlongStreamResponse {
     @ApiModelProperty(value = "Field list")
     private List<InlongStreamFieldInfo> fieldList;
 
+    @ApiModelProperty(value = "Inlong stream Extension properties")
+    private List<InlongStreamExtInfo> extList;
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamExtEntity.java
similarity index 57%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamExtEntity.java
index b3af2b3d0..4bc648a6d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongStreamExtEntity.java
@@ -15,40 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.plugin.flink.dto;
+package org.apache.inlong.manager.dao.entity;
 
 import lombok.Data;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
-import java.util.List;
+import java.io.Serializable;
+import java.util.Date;
 
 /**
- * Flink infomation, including end point, job name, source type, etc.
+ * InlongStreamExtEntity, including key name, key value, etc.
  */
 @Data
-public class FlinkInfo {
+public class InlongStreamExtEntity implements Serializable {
 
-    private String endpoint;
+    private static final long serialVersionUID = 1L;
 
-    private String jobName;
+    private Integer id;
 
-    private List<InlongStreamInfo> inlongStreamInfoList;
+    private String inlongGroupId;
 
-    private String localJarPath;
+    private String inlongStreamId;
 
-    private String localConfPath;
+    private String keyName;
 
-    private String sourceType;
+    private String keyValue;
 
-    private String sinkType;
+    private Integer isDeleted;
 
-    private String jobId;
-
-    private String savepointPath;
-
-    private boolean isException = false;
-
-    private String exceptionMsg;
-
-
-}
+    private Date modifyTime;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
new file mode 100644
index 000000000..68ef19457
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface InlongStreamExtEntityMapper {
+
+    int deleteByPrimaryKey(Integer id);
+
+    int insert(InlongStreamExtEntity record);
+
+    int insertSelective(InlongStreamExtEntity record);
+
+    InlongStreamExtEntity selectByPrimaryKey(Integer id);
+
+    int updateByPrimaryKeySelective(InlongStreamExtEntity record);
+
+    int updateByPrimaryKeyWithBLOBs(InlongStreamExtEntity record);
+
+    int updateByPrimaryKey(InlongStreamExtEntity record);
+
+    List<InlongStreamExtEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+    /**
+     * Insert data in batches
+     *
+     * @param extList need to insert data
+     */
+    int insertAll(@Param("extList") List<InlongStreamExtEntity> extList);
+
+    /**
+     * Insert data in batches, update if it exists, create new if it does not exist
+     *
+     * @param extList need to insertOnDuplicateUpdate data
+     */
+    int insertOnDuplicateKeyUpdate(@Param("extList") List<InlongStreamExtEntity> extList);
+
+    /**
+     * Physically delete all extension fields based on group id and stream id
+     *
+     * @return rows deleted
+     */
+    int deleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+    /**
+     * Logically delete all extended fields based on group id and stream id
+     *
+     * @return rows updated
+     */
+    int logicDeleteAllByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index d875c8b7f..a1a977bb2 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -67,7 +67,7 @@
         </javaClientGenerator>
 
         <!-- Which entities to generate -->
-        <table tableName="data_node" domainObjectName="DataNodeEntity"
+        <table tableName="inlong_stream_ext" domainObjectName="InlongStreamExtEntity"
                 enableInsert="true" enableSelectByPrimaryKey="true"
                 enableUpdateByPrimaryKey="true" enableDeleteByPrimaryKey="true"
                 enableCountByExample="false" enableDeleteByExample="false"
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
new file mode 100644
index 000000000..747701721
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
@@ -0,0 +1,203 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    <id column="id" jdbcType="INTEGER" property="id" />
+    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+    <result column="key_name" jdbcType="VARCHAR" property="keyName" />
+    <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
+    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+  </resultMap>
+  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    <result column="key_value" jdbcType="LONGVARCHAR" property="keyValue" />
+  </resultMap>
+  <sql id="Base_Column_List">
+    id, inlong_group_id, inlong_stream_id, key_name, is_deleted, modify_time
+  </sql>
+  <sql id="Blob_Column_List">
+    key_value
+  </sql>
+  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+    select 
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from inlong_stream_ext
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    delete from inlong_stream_ext
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    insert into inlong_stream_ext (id, inlong_group_id, inlong_stream_id, 
+      key_name, is_deleted, modify_time, 
+      key_value)
+    values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, 
+      #{keyName,jdbcType=VARCHAR}, #{isDeleted,jdbcType=INTEGER}, #{modifyTime,jdbcType=TIMESTAMP}, 
+      #{keyValue,jdbcType=LONGVARCHAR})
+  </insert>
+  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    insert into inlong_stream_ext
+    <trim prefix="(" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        id,
+      </if>
+      <if test="inlongGroupId != null">
+        inlong_group_id,
+      </if>
+      <if test="inlongStreamId != null">
+        inlong_stream_id,
+      </if>
+      <if test="keyName != null">
+        key_name,
+      </if>
+      <if test="isDeleted != null">
+        is_deleted,
+      </if>
+      <if test="modifyTime != null">
+        modify_time,
+      </if>
+      <if test="keyValue != null">
+        key_value,
+      </if>
+    </trim>
+    <trim prefix="values (" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        #{id,jdbcType=INTEGER},
+      </if>
+      <if test="inlongGroupId != null">
+        #{inlongGroupId,jdbcType=VARCHAR},
+      </if>
+      <if test="inlongStreamId != null">
+        #{inlongStreamId,jdbcType=VARCHAR},
+      </if>
+      <if test="keyName != null">
+        #{keyName,jdbcType=VARCHAR},
+      </if>
+      <if test="isDeleted != null">
+        #{isDeleted,jdbcType=INTEGER},
+      </if>
+      <if test="modifyTime != null">
+        #{modifyTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="keyValue != null">
+        #{keyValue,jdbcType=LONGVARCHAR},
+      </if>
+    </trim>
+  </insert>
+  <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    update inlong_stream_ext
+    <set>
+      <if test="inlongGroupId != null">
+        inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+      </if>
+      <if test="inlongStreamId != null">
+        inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+      </if>
+      <if test="keyName != null">
+        key_name = #{keyName,jdbcType=VARCHAR},
+      </if>
+      <if test="isDeleted != null">
+        is_deleted = #{isDeleted,jdbcType=INTEGER},
+      </if>
+      <if test="modifyTime != null">
+        modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="keyValue != null">
+        key_value = #{keyValue,jdbcType=LONGVARCHAR},
+      </if>
+    </set>
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+  <update id="updateByPrimaryKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    update inlong_stream_ext
+    set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+      key_name = #{keyName,jdbcType=VARCHAR},
+      is_deleted = #{isDeleted,jdbcType=INTEGER},
+      modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+      key_value = #{keyValue,jdbcType=LONGVARCHAR}
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+  <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    update inlong_stream_ext
+    set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+      key_name = #{keyName,jdbcType=VARCHAR},
+      is_deleted = #{isDeleted,jdbcType=INTEGER},
+      modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+  <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+    select
+    <include refid="Base_Column_List"/>
+    from inlong_stream_ext
+    where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+    <if test="streamId != null and streamId != ''">
+      and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </if>
+    and is_deleted = 0
+  </select>
+  <!-- Bulk insert-->
+  <insert id="insertAll" parameterType="java.util.List">
+    insert into inlong_stream_ext
+    (id, inlong_group_id, inlong_stream_id, key_name, key_value, is_deleted)
+    values
+    <foreach collection="extList" separator="," index="index" item="item">
+      (#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.keyName}, #{item.keyValue}, #{item.isDeleted})
+    </foreach>
+  </insert>
+  <!-- Bulk insert,update if exists-->
+  <insert id="insertOnDuplicateKeyUpdate" parameterType="java.util.List">
+    insert into inlong_stream_ext
+    (id, inlong_group_id, inlong_stream_id, key_name, key_value, is_deleted)
+    values
+    <foreach collection="extList" separator="," index="index" item="item">
+      (#{item.id}, #{item.inlongGroupId}, #{item.inlongStreamId}, #{item.keyName}, #{item.keyValue}, #{item.isDeleted})
+    </foreach>
+    ON DUPLICATE KEY UPDATE
+    inlong_group_id = VALUES(inlong_group_id),
+    inlong_stream_id = VALUES(inlong_stream_id),
+    key_name = VALUES(key_name),
+    key_value = VALUES(key_value),
+    is_deleted = VALUES(is_deleted)
+  </insert>
+  <delete id="deleteAllByRelatedId">
+    delete
+    from inlong_stream_ext
+    where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+    <if test="streamId != null and streamId != ''">
+      and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </if>
+  </delete>
+  <update id="logicDeleteAllByRelatedId">
+    update inlong_stream_ext
+    set is_deleted = 1
+    where is_deleted = 0
+    and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+    <if test="streamId != null and streamId != ''">
+      and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+    </if>
+  </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
index 04407ab04..45cf2b1e2 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
@@ -19,13 +19,21 @@ package org.apache.inlong.manager.plugin;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.plugin.eventselect.DeleteProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.DeleteStreamSelector;
 import org.apache.inlong.manager.plugin.eventselect.RestartProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.RestartStreamSelector;
 import org.apache.inlong.manager.plugin.eventselect.StartupProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.StartupStreamSelector;
 import org.apache.inlong.manager.plugin.eventselect.SuspendProcessSelector;
+import org.apache.inlong.manager.plugin.eventselect.SuspendStreamSelector;
 import org.apache.inlong.manager.plugin.listener.DeleteSortListener;
+import org.apache.inlong.manager.plugin.listener.DeleteStreamListener;
 import org.apache.inlong.manager.plugin.listener.RestartSortListener;
+import org.apache.inlong.manager.plugin.listener.RestartStreamListener;
 import org.apache.inlong.manager.plugin.listener.StartupSortListener;
+import org.apache.inlong.manager.plugin.listener.StartupStreamListener;
 import org.apache.inlong.manager.plugin.listener.SuspendSortListener;
+import org.apache.inlong.manager.plugin.listener.SuspendStreamListener;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 import org.apache.inlong.manager.workflow.event.task.DataSourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -52,6 +60,10 @@ public class FlinkSortProcessPlugin implements ProcessPlugin {
         listeners.put(new RestartSortListener(), new RestartProcessSelector());
         listeners.put(new SuspendSortListener(), new SuspendProcessSelector());
         listeners.put(new StartupSortListener(), new StartupProcessSelector());
+        listeners.put(new DeleteStreamListener(), new DeleteStreamSelector());
+        listeners.put(new RestartStreamListener(), new RestartStreamSelector());
+        listeners.put(new SuspendStreamListener(), new SuspendStreamSelector());
+        listeners.put(new StartupStreamListener(), new StartupStreamSelector());
         return listeners;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
index d7a532cd8..470b707dc 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
@@ -37,7 +37,7 @@ public class DeleteProcessSelector implements EventSelector {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add deleteProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+            log.info("not add deleteProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
                     groupId);
             return false;
         }
@@ -45,7 +45,7 @@ public class DeleteProcessSelector implements EventSelector {
         GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
         boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
         if (!flag) {
-            log.info("not add deleteProcess listener as the operate was not DELETE for groupId [{}]", groupId);
+            log.info("not add deleteProcess listener, as the operate was not DELETE for groupId [{}]", groupId);
             return false;
         }
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
similarity index 62%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
index d7a532cd8..c0b738c0a 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/DeleteStreamSelector.java
@@ -20,36 +20,38 @@ package org.apache.inlong.manager.plugin.eventselect;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 /**
- * Selector of delete process event.
+ * Selector of delete stream event.
  */
 @Slf4j
-public class DeleteProcessSelector implements EventSelector {
+public class DeleteStreamSelector implements EventSelector {
 
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext context) {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add deleteProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add deleteStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
                     groupId);
             return false;
         }
 
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
+        boolean flag = streamResourceProcessForm.getGroupOperateType() == GroupOperateType.DELETE;
         if (!flag) {
-            log.info("not add deleteProcess listener as the operate was not DELETE for groupId [{}]", groupId);
+            log.info("not add deleteStream listener, as the operate was not DELETE for groupId [{}] and streamId [{}]",
+                    groupId, streamId);
             return false;
         }
 
-        log.info("add deleteProcess listener for groupId [{}]", groupId);
+        log.info("add deleteStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
index ebc727ca0..49f697aa7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
@@ -37,15 +37,15 @@ public class RestartProcessSelector implements EventSelector {
         ProcessForm processForm = workflowContext.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+            log.info("not add restartProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
                     groupId);
             return false;
         }
 
-        GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+        boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
         if (!flag) {
-            log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+            log.info("\"not add restartProcess listener, as the operate was not RESTART for groupId [{}]", groupId);
             return false;
         }
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
similarity index 64%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
index ebc727ca0..b9fb24aa9 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartStreamSelector.java
@@ -20,36 +20,38 @@ package org.apache.inlong.manager.plugin.eventselect;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 /**
- * Selector of restart process event.
+ * Selector of restart stream event.
  */
 @Slf4j
-public class RestartProcessSelector implements EventSelector {
+public class RestartStreamSelector implements EventSelector {
 
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext workflowContext) {
         ProcessForm processForm = workflowContext.getProcessForm();
         String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add restartStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
                     groupId);
             return false;
         }
 
-        GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
         if (!flag) {
-            log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+            log.info("not add restartStream listener, as the operate was not RESTART for groupId [{}] streamId [{}]",
+                    groupId, streamId);
             return false;
         }
 
-        log.info("add restartProcess listener for groupId [{}]", groupId);
+        log.info("add restartStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
index 49980eaa0..c4ea4293b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupProcessSelector.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.eventselect;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -36,7 +37,14 @@ public class StartupProcessSelector implements EventSelector {
         ProcessForm processForm = workflowContext.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add startupProcess listener as the form was not GroupResource for groupId [{}]", groupId);
+            log.info("not add startupProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
+                    groupId);
+            return false;
+        }
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm;
+        boolean flag = groupProcessForm.getGroupOperateType() == GroupOperateType.INIT;
+        if (!flag) {
+            log.info("not add startupProcess listener, as the operate was not INIT for groupId [{}]", groupId);
             return false;
         }
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
similarity index 63%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
index ebc727ca0..85514d209 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/StartupStreamSelector.java
@@ -20,36 +20,36 @@ package org.apache.inlong.manager.plugin.eventselect;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 /**
- * Selector of restart process event.
+ * Selector of startup stream event.
  */
 @Slf4j
-public class RestartProcessSelector implements EventSelector {
+public class StartupStreamSelector implements EventSelector {
 
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext workflowContext) {
         ProcessForm processForm = workflowContext.getProcessForm();
         String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add startupStream listener, as the form was not StreamResourceProcessForm for groupId [{}]",
                     groupId);
             return false;
         }
-
-        GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.INIT;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
         if (!flag) {
-            log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+            log.info("not add startupStream listener, as the operate was not INIT for groupId [{}] and streamId [{}]",
+                    groupId, streamId);
             return false;
         }
-
-        log.info("add restartProcess listener for groupId [{}]", groupId);
+        log.info("add startupStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
index 271b7496e..ea5576c68 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendProcessSelector.java
@@ -37,7 +37,7 @@ public class SuspendProcessSelector implements EventSelector {
         ProcessForm processForm = workflowContext.getProcessForm();
         String groupId = processForm.getInlongGroupId();
         if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add suspendProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
+            log.info("not add suspendProcess listener, as the form was not GroupResourceProcessForm for groupId [{}]",
                     groupId);
             return false;
         }
@@ -45,7 +45,7 @@ public class SuspendProcessSelector implements EventSelector {
         GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
         boolean flag = groupResourceProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
         if (!flag) {
-            log.info("not add suspendProcess listener as the operate was not SUSPEND for groupId [{}]", groupId);
+            log.info("not add suspendProcess listener, as the operate was not SUSPEND for groupId [{}]", groupId);
             return false;
         }
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
similarity index 63%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
index ebc727ca0..214e7cae6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/RestartProcessSelector.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/eventselect/SuspendStreamSelector.java
@@ -20,36 +20,36 @@ package org.apache.inlong.manager.plugin.eventselect;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 /**
- * Selector of restart process event.
+ * Selector of startup stream event.
  */
 @Slf4j
-public class RestartProcessSelector implements EventSelector {
+public class SuspendStreamSelector implements EventSelector {
 
     @SneakyThrows
     @Override
     public boolean accept(WorkflowContext workflowContext) {
         ProcessForm processForm = workflowContext.getProcessForm();
         String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            log.info("not add restartProcess listener as the form was not GroupResourceProcessForm for groupId [{}]",
-                    groupId);
+        if (!(processForm instanceof StreamResourceProcessForm)) {
+            log.info("not add suspendStream listener as StreamResourceProcessForm for groupId [{}]", groupId);
             return false;
         }
-
-        GroupResourceProcessForm updateProcessForm = (GroupResourceProcessForm) processForm;
-        boolean flag = updateProcessForm.getGroupOperateType() == GroupOperateType.RESTART;
+        StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm;
+        String streamId = streamProcessForm.getStreamInfo().getInlongStreamId();
+        boolean flag = streamProcessForm.getGroupOperateType() == GroupOperateType.SUSPEND;
         if (!flag) {
-            log.info("not add restartProcess listener as the operate was not RESTART for groupId [{}]", groupId);
+            log.info("not add suspendStream listener as the operate SUSPEND for groupId [{}] and streamId [{}]",
+                    groupId, streamId);
             return false;
         }
 
-        log.info("add restartProcess listener for groupId [{}]", groupId);
+        log.info("add suspendStream listener for groupId [{}] and streamId [{}]", groupId, streamId);
         return true;
     }
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index e23439afb..31e5b2a5e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -192,7 +192,7 @@ public class FlinkService {
     private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
         String localJarPath = flinkInfo.getLocalJarPath();
         File jarFile = new File(localJarPath);
-        String[] programArgs = genProgramArgs(flinkInfo, flinkConfig);
+        String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
 
         PackagedProgram program = PackagedProgram.newBuilder()
                 .setConfiguration(configuration)
@@ -240,7 +240,8 @@ public class FlinkService {
     /**
      * Build the program of the Flink job.
      */
-    private String[] genProgramArgs(FlinkInfo flinkInfo,FlinkConfig flinkConfig) {
+    @Deprecated
+    private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
         List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
@@ -262,4 +263,15 @@ public class FlinkService {
         return list.toArray(new String[0]);
     }
 
+    private String[] genProgramArgsV2(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
+        List<String> list = new ArrayList<>();
+        list.add("-cluster-id");
+        list.add(flinkInfo.getJobName());
+        list.add("-dataflow.info.file");
+        list.add(flinkInfo.getLocalConfPath());
+        list.add("-checkpoint.interval");
+        list.add("60000");
+        return list.toArray(new String[0]);
+    }
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index b3af2b3d0..86d6d8ca1 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -49,6 +49,4 @@ public class FlinkInfo {
     private boolean isException = false;
 
     private String exceptionMsg;
-
-
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
similarity index 59%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 485d1bca4..4b2b2ff65 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -18,19 +18,19 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -38,17 +38,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
- * Listener of restart sort.
+ * Listener of delete stream sort
  */
 @Slf4j
-public class RestartSortListener implements SortOperateListener {
+public class DeleteStreamListener implements SortOperateListener {
 
     public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -60,24 +58,25 @@ public class RestartSortListener implements SortOperateListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = extList.stream().collect(
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+        log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+        InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        Map<String, String> kvConf = groupExtList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        streamExtList.stream().forEach(extInfo -> {
+            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+        });
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
+            String message = String.format(
+                    "delete sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+                    groupId, streamId);
             log.error(message);
             return ListenerResult.fail(message);
         }
@@ -88,51 +87,27 @@ public class RestartSortListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s]", groupId);
-            return ListenerResult.fail(message);
-        }
-        String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
-        if (StringUtils.isEmpty(dataFlows)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        // TODO Support more than one dataflow in one sort job
-        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
-                OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
-                });
-        Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
-        JsonNode dataFlow = null;
-        if (dataflowOptional.isPresent()) {
-            dataFlow = dataflowOptional.get();
-        }
-        if (Objects.isNull(dataFlow)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.warn(message);
+            String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
             return ListenerResult.fail(message);
         }
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
-        flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            flinkOperation.genPath(flinkInfo, dataFlow.toString());
-            flinkOperation.restart(flinkInfo);
-            log.info("job restart success for [{}]", jobId);
+            flinkOperation.delete(flinkInfo);
+            log.info("job delete success for [{}]", jobId);
             return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("restart sort failed for groupId [%s] ", groupId);
+            String message = String.format("delete sort failed for groupId [%s] streamId [%s]", groupId, streamId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 485d1bca4..507972587 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -38,8 +37,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -98,21 +95,6 @@ public class RestartSortListener implements SortOperateListener {
             return ListenerResult.fail(message);
         }
 
-        // TODO Support more than one dataflow in one sort job
-        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
-                OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
-                });
-        Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
-        JsonNode dataFlow = null;
-        if (dataflowOptional.isPresent()) {
-            dataFlow = dataflowOptional.get();
-        }
-        if (Objects.isNull(dataFlow)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.warn(message);
-            return ListenerResult.fail(message);
-        }
-
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
@@ -123,7 +105,7 @@ public class RestartSortListener implements SortOperateListener {
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            flinkOperation.genPath(flinkInfo, dataFlow.toString());
+            flinkOperation.genPath(flinkInfo, dataFlows);
             flinkOperation.restart(flinkInfo);
             log.info("job restart success for [{}]", jobId);
             return ListenerResult.success();
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
similarity index 68%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 485d1bca4..3974da351 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -18,14 +18,15 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
@@ -38,17 +39,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
- * Listener of restart sort.
+ * Listener of restart stream sort.
  */
 @Slf4j
-public class RestartSortListener implements SortOperateListener {
+public class RestartStreamListener implements SortOperateListener {
 
     public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -60,24 +59,25 @@ public class RestartSortListener implements SortOperateListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = extList.stream().collect(
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+        log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+        InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        Map<String, String> kvConf = groupExtList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        streamExtList.stream().forEach(extInfo -> {
+            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+        });
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
+            String message = String.format(
+                    "restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
+                    groupId, streamId);
             log.error(message);
             return ListenerResult.fail(message);
         }
@@ -88,31 +88,16 @@ public class RestartSortListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s]", groupId);
+            String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
             return ListenerResult.fail(message);
         }
         String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
+            String message = String.format("dataflow is empty for groupId [%s] streamId [%s]", groupId, streamId);
             log.error(message);
             return ListenerResult.fail(message);
         }
 
-        // TODO Support more than one dataflow in one sort job
-        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
-                OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
-                });
-        Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
-        JsonNode dataFlow = null;
-        if (dataflowOptional.isPresent()) {
-            dataFlow = dataflowOptional.get();
-        }
-        if (Objects.isNull(dataFlow)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.warn(message);
-            return ListenerResult.fail(message);
-        }
-
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
@@ -123,7 +108,7 @@ public class RestartSortListener implements SortOperateListener {
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            flinkOperation.genPath(flinkInfo, dataFlow.toString());
+            flinkOperation.genPath(flinkInfo, dataFlows);
             flinkOperation.restart(flinkInfo);
             log.info("job restart success for [{}]", jobId);
             return ListenerResult.success();
@@ -132,7 +117,7 @@ public class RestartSortListener implements SortOperateListener {
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("restart sort failed for groupId [%s] ", groupId);
+            String message = String.format("restart sort failed for groupId [%s] streamId [%s] ", groupId, streamId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 150780967..3d99974ab 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -38,8 +37,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -90,19 +87,6 @@ public class StartupSortListener implements SortOperateListener {
             log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
-                OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
-                });
-        Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
-        JsonNode dataFlow = null;
-        if (dataflowOptional.isPresent()) {
-            dataFlow = dataflowOptional.get();
-        }
-        if (Objects.isNull(dataFlow)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.warn(message);
-            return ListenerResult.fail(message);
-        }
 
         FlinkInfo flinkInfo = new FlinkInfo();
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
@@ -110,17 +94,15 @@ public class StartupSortListener implements SortOperateListener {
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
         flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
-        parseDataflow(dataFlow, flinkInfo);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
 
         try {
-            flinkOperation.genPath(flinkInfo, dataFlow.toString());
+            flinkOperation.genPath(flinkInfo, dataFlows);
             flinkOperation.start(flinkInfo);
             log.info("job submit success, jobId is [{}]", flinkInfo.getJobId());
         } catch (Exception e) {
-            // TODO why call 4 times
             flinkOperation.pollJobStatus(flinkInfo);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
@@ -147,18 +129,6 @@ public class StartupSortListener implements SortOperateListener {
         extInfoList.add(extInfo);
     }
 
-    /**
-     * Init FlinkConf
-     */
-    private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
-        JsonNode sourceInfo = dataflow.get(Constants.SOURCE_INFO);
-        String sourceType = sourceInfo.get(Constants.TYPE).asText();
-        flinkInfo.setSourceType(sourceType);
-        JsonNode sinkInfo = dataflow.get(Constants.SINK_INFO);
-        String sinkType = sinkInfo.get(Constants.TYPE).asText();
-        flinkInfo.setSinkType(sinkType);
-    }
-
     @Override
     public boolean async() {
         return false;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
similarity index 60%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 150780967..b64633830 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -18,14 +18,15 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
@@ -38,17 +39,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
- * Listener of startup sort.
+ * Listener for startup stream sort
  */
 @Slf4j
-public class StartupSortListener implements SortOperateListener {
+public class StartupStreamListener implements SortOperateListener {
 
     public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -60,22 +59,21 @@ public class StartupSortListener implements SortOperateListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            String message = String.format("process form was not GroupResource for groupId [%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = groupResourceForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = extList.stream().filter(v -> StringUtils.isNotEmpty(v.getKeyName())
-                && StringUtils.isNotEmpty(v.getKeyValue())).collect(Collectors.toMap(
-                InlongGroupExtInfo::getKeyName,
-                InlongGroupExtInfo::getKeyValue));
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+        log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+        InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+
+        Map<String, String> kvConf = groupExtList.stream().collect(
+                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        streamExtList.stream().forEach(extInfo -> {
+            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+        });
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isNotEmpty(sortExt)) {
             Map<String, String> result = OBJECT_MAPPER.convertValue(OBJECT_MAPPER.readTree(sortExt),
@@ -86,52 +84,36 @@ public class StartupSortListener implements SortOperateListener {
 
         String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
         if (StringUtils.isEmpty(dataFlows)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
+            String message = String.format("dataflow is empty for groupId [%s] and streamId [%s]", groupId, streamId);
             log.error(message);
             return ListenerResult.fail(message);
         }
-        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
-                OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
-                });
-        Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
-        JsonNode dataFlow = null;
-        if (dataflowOptional.isPresent()) {
-            dataFlow = dataflowOptional.get();
-        }
-        if (Objects.isNull(dataFlow)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.warn(message);
-            return ListenerResult.fail(message);
-        }
 
         FlinkInfo flinkInfo = new FlinkInfo();
         String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
         flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
-        flinkInfo.setInlongStreamInfoList(groupResourceForm.getStreamInfos());
-        parseDataflow(dataFlow, flinkInfo);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
 
         try {
-            flinkOperation.genPath(flinkInfo, dataFlow.toString());
+            flinkOperation.genPath(flinkInfo, dataFlows);
             flinkOperation.start(flinkInfo);
             log.info("job submit success, jobId is [{}]", flinkInfo.getJobId());
         } catch (Exception e) {
-            // TODO why call 4 times
             flinkOperation.pollJobStatus(flinkInfo);
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("startup sort failed for groupId [%s] ", groupId);
+            String message = String.format("startup sort failed for groupId [%s] streamId [%s]", groupId, streamId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
 
-        saveInfo(groupId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), extList);
+        saveInfo(groupId, streamId, InlongGroupSettings.SORT_JOB_ID, flinkInfo.getJobId(), streamExtList);
         flinkOperation.pollJobStatus(flinkInfo);
         return ListenerResult.success();
     }
@@ -139,26 +121,16 @@ public class StartupSortListener implements SortOperateListener {
     /**
      * Save ext info into list.
      */
-    private void saveInfo(String inlongGroupId, String keyName, String keyValue, List<InlongGroupExtInfo> extInfoList) {
-        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+    private void saveInfo(String inlongGroupId, String inlongStreamId, String keyName, String keyValue,
+            List<InlongStreamExtInfo> extInfoList) {
+        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
         extInfo.setInlongGroupId(inlongGroupId);
+        extInfo.setInlongStreamId(inlongStreamId);
         extInfo.setKeyName(keyName);
         extInfo.setKeyValue(keyValue);
         extInfoList.add(extInfo);
     }
 
-    /**
-     * Init FlinkConf
-     */
-    private void parseDataflow(JsonNode dataflow, FlinkInfo flinkInfo) {
-        JsonNode sourceInfo = dataflow.get(Constants.SOURCE_INFO);
-        String sourceType = sourceInfo.get(Constants.TYPE).asText();
-        flinkInfo.setSourceType(sourceType);
-        JsonNode sinkInfo = dataflow.get(Constants.SINK_INFO);
-        String sinkType = sinkInfo.get(Constants.TYPE).asText();
-        flinkInfo.setSinkType(sinkType);
-    }
-
     @Override
     public boolean async() {
         return false;
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
similarity index 59%
copy from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
copy to inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 485d1bca4..e5cb9cb7b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -18,19 +18,19 @@
 package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -38,17 +38,15 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
 /**
- * Listener of restart sort.
+ * Listener of suspend stream sort.
  */
 @Slf4j
-public class RestartSortListener implements SortOperateListener {
+public class SuspendStreamListener implements SortOperateListener {
 
     public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -60,24 +58,25 @@ public class RestartSortListener implements SortOperateListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         ProcessForm processForm = context.getProcessForm();
-        String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            String message = String.format("process form was not GroupResourceProcessForm for groupId [%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo inlongGroupInfo = groupResourceProcessForm.getGroupInfo();
-        List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
-        log.info("inlong group ext info: {}", extList);
-
-        Map<String, String> kvConf = extList.stream().collect(
+        StreamResourceProcessForm streamResourceProcessForm = (StreamResourceProcessForm) processForm;
+        InlongGroupInfo groupInfo = streamResourceProcessForm.getGroupInfo();
+        List<InlongGroupExtInfo> groupExtList = groupInfo.getExtList();
+        log.info("inlong group :{} ext info: {}", groupInfo.getInlongGroupId(), groupExtList);
+        InlongStreamInfo streamInfo = streamResourceProcessForm.getStreamInfo();
+        List<InlongStreamExtInfo> streamExtList = streamInfo.getExtList();
+        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        Map<String, String> kvConf = groupExtList.stream().collect(
                 Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        streamExtList.stream().forEach(extInfo -> {
+            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+        });
         String sortExt = kvConf.get(InlongGroupSettings.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
-                    groupId);
+            String message = String.format(
+                    "suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty",
+                    groupId, streamId);
             log.error(message);
             return ListenerResult.fail(message);
         }
@@ -88,51 +87,27 @@ public class RestartSortListener implements SortOperateListener {
         kvConf.putAll(result);
         String jobId = kvConf.get(InlongGroupSettings.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
-            String message = String.format("sort job id is empty for groupId [%s]", groupId);
-            return ListenerResult.fail(message);
-        }
-        String dataFlows = kvConf.get(InlongGroupSettings.DATA_FLOW);
-        if (StringUtils.isEmpty(dataFlows)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
-        }
-
-        // TODO Support more than one dataflow in one sort job
-        Map<String, JsonNode> dataflowMap = OBJECT_MAPPER.convertValue(
-                OBJECT_MAPPER.readTree(dataFlows), new TypeReference<Map<String, JsonNode>>() {
-                });
-        Optional<JsonNode> dataflowOptional = dataflowMap.values().stream().findFirst();
-        JsonNode dataFlow = null;
-        if (dataflowOptional.isPresent()) {
-            dataFlow = dataflowOptional.get();
-        }
-        if (Objects.isNull(dataFlow)) {
-            String message = String.format("dataflow is empty for groupId [%s]", groupId);
-            log.warn(message);
+            String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
             return ListenerResult.fail(message);
         }
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String jobName = Constants.INLONG + context.getProcessForm().getInlongGroupId();
-        flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongGroupSettings.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
 
         FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint());
         FlinkOperation flinkOperation = new FlinkOperation(flinkService);
         try {
-            flinkOperation.genPath(flinkInfo, dataFlow.toString());
-            flinkOperation.restart(flinkInfo);
-            log.info("job restart success for [{}]", jobId);
+            flinkOperation.stop(flinkInfo);
+            log.info("job suspend success for [{}]", jobId);
             return ListenerResult.success();
         } catch (Exception e) {
             flinkInfo.setException(true);
             flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
             flinkOperation.pollJobStatus(flinkInfo);
 
-            String message = String.format("restart sort failed for groupId [%s] ", groupId);
+            String message = String.format("suspend sort failed for groupId [%s] streamId[%s]", groupId, streamId);
             log.error(message, e);
             return ListenerResult.fail(message + e.getMessage());
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index d1830fb04..564350292 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
@@ -46,9 +47,11 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
@@ -82,6 +85,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Autowired
     private InlongStreamFieldEntityMapper streamFieldMapper;
     @Autowired
+    private InlongStreamExtEntityMapper streamExtMapper;
+    @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
     private StreamSourceService sourceService;
@@ -117,7 +122,10 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         streamEntity.setCreateTime(new Date());
 
         streamMapper.insertSelective(streamEntity);
-        this.saveField(groupId, streamId, request.getFieldList());
+        saveField(groupId, streamId, request.getFieldList());
+        if (CollectionUtils.isNotEmpty(request.getExtList())) {
+            saveOrUpdateExt(groupId, streamId, request.getExtList());
+        }
 
         LOGGER.info("success to save inlong stream info for groupId={}", groupId);
         return streamEntity.getId();
@@ -135,12 +143,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
         }
 
-        InlongStreamInfo response = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
-        List<InlongStreamFieldInfo> streamFields = this.getStreamFields(groupId, streamId);
-        response.setFieldList(streamFields);
-
+        InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
+        List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, streamId);
+        streamInfo.setFieldList(streamFields);
+        List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, streamId);
+        List<InlongStreamExtInfo> exts = CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+        streamInfo.setExtList(exts);
         LOGGER.info("success to get inlong stream for groupId={}", groupId);
-        return response;
+        return streamInfo;
     }
 
     @Override
@@ -149,15 +159,23 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         List<InlongStreamEntity> inlongStreamEntityList = streamMapper.selectByGroupId(groupId);
         List<InlongStreamInfo> streamList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
                 InlongStreamInfo::new);
-        List<InlongStreamFieldInfo> streamFields = this.getStreamFields(groupId, null);
+        List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, null);
         Map<String, List<InlongStreamFieldInfo>> streamFieldMap = streamFields.stream().collect(
                 Collectors.groupingBy(InlongStreamFieldInfo::getInlongStreamId,
                         HashMap::new,
                         Collectors.toCollection(ArrayList::new)));
+        List<InlongStreamExtEntity> extEntities = streamExtMapper.selectByRelatedId(groupId, null);
+        Map<String, List<InlongStreamExtInfo>> extInfoMap = extEntities.stream()
+                .map(extEntity -> CommonBeanUtils.copyProperties(extEntity, InlongStreamExtInfo::new))
+                .collect(Collectors.groupingBy(InlongStreamExtInfo::getInlongStreamId,
+                        HashMap::new,
+                        Collectors.toCollection(ArrayList::new)));
         streamList.stream().forEach(streamInfo -> {
             String streamId = streamInfo.getInlongStreamId();
             List<InlongStreamFieldInfo> fieldInfos = streamFieldMap.get(streamId);
             streamInfo.setFieldList(fieldInfos);
+            List<InlongStreamExtInfo> extInfos = extInfoMap.get(streamId);
+            streamInfo.setExtList(extInfos);
         });
         return streamList;
     }
@@ -245,7 +263,10 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         streamMapper.updateByIdentifierSelective(streamEntity);
 
         // Update field information
-        this.updateField(groupId, streamId, request.getFieldList());
+        updateField(groupId, streamId, request.getFieldList());
+        // Update extension info
+        List<InlongStreamExtInfo> extInfos = request.getExtList();
+        saveOrUpdateExt(groupId, streamId, extInfos);
 
         LOGGER.info("success to update inlong stream for groupId={}", groupId);
         return true;
@@ -288,6 +309,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         // Logically delete the associated field table
         LOGGER.debug("begin to delete inlong stream field, streamId={}", streamId);
         streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
+        streamExtMapper.logicDeleteAllByRelatedId(groupId, streamId);
 
         LOGGER.info("success to delete inlong stream, ext property and fields for groupId={}", groupId);
         return true;
@@ -316,6 +338,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             String streamId = entity.getInlongStreamId();
             // Logically delete the associated field, source and sink info
             streamFieldMapper.logicDeleteAllByIdentifier(groupId, streamId);
+            streamExtMapper.logicDeleteAllByRelatedId(groupId, streamId);
             sourceService.logicDeleteAll(groupId, streamId, operator);
             sinkService.logicDeleteAll(groupId, streamId, operator);
         }
@@ -357,17 +380,17 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         // Check whether it can be added: check by lower-level specific services
         // this.checkBizIsTempStatus(streamInfo.getInlongGroupId());
 
-        // 1. Save inlong stream
-        this.save(streamRequest, operator);
+        // Save inlong stream
+        save(streamRequest, operator);
 
-        // 2 Save source info
+        // Save source info
         if (CollectionUtils.isNotEmpty(fullStreamRequest.getSourceInfo())) {
             for (SourceRequest source : fullStreamRequest.getSourceInfo()) {
                 sourceService.save(source, operator);
             }
         }
 
-        // 3. Save sink info
+        // Save sink info
         if (CollectionUtils.isNotEmpty(fullStreamRequest.getSinkInfo())) {
             for (SinkRequest sinkInfo : fullStreamRequest.getSinkInfo()) {
                 sinkService.save(sinkInfo, operator);
@@ -399,18 +422,16 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         streamMapper.deleteAllByGroupId(groupId);
 
         for (FullStreamRequest pageInfo : fullStreamRequestList) {
-            // 1.1 Delete the inlong stream extensions and fields corresponding to groupId and streamId
+            // Delete the inlong stream extensions and fields corresponding to groupId and streamId
             InlongStreamRequest streamInfo = pageInfo.getStreamInfo();
             String streamId = streamInfo.getInlongStreamId();
             streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
-
-            // 2. Delete all stream source
+            streamExtMapper.deleteAllByRelatedId(groupId, streamId);
+            //  Delete all stream source
             sourceService.deleteAll(groupId, streamId, operator);
-
-            // 3. Delete all stream sink
+            // Delete all stream sink
             sinkService.deleteAll(groupId, streamId, operator);
-
-            // 4. Save the inlong stream of this batch
+            // Save the inlong stream of this batch
             this.saveAll(pageInfo, operator);
         }
         LOGGER.info("success to batch save all stream page info");
@@ -441,23 +462,26 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         // Convert and encapsulate the paged results
         List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
         for (InlongStreamInfo streamInfo : streamInfoList) {
-            // 2Set the field information of the inlong stream
+            // Set the field information of the inlong stream
             String streamId = streamInfo.getInlongStreamId();
             List<InlongStreamFieldInfo> streamFields = getStreamFields(groupId, streamId);
             streamInfo.setFieldList(streamFields);
+            List<InlongStreamExtInfo> streamExtInfos = CommonBeanUtils.copyListProperties(
+                    streamExtMapper.selectByRelatedId(groupId, streamId), InlongStreamExtInfo::new);
+            streamInfo.setExtList(streamExtInfos);
 
             FullStreamResponse pageInfo = new FullStreamResponse();
             pageInfo.setStreamInfo(streamInfo);
 
-            // 3. Query stream sources information
+            // Query stream sources information
             List<SourceResponse> sourceList = sourceService.listSource(groupId, streamId);
             pageInfo.setSourceInfo(sourceList);
 
-            // 4. Query various stream sinks and its extended information, field information
+            // Query various stream sinks and its extended information, field information
             List<SinkResponse> sinkList = sinkService.listSink(groupId, streamId);
             pageInfo.setSinkInfo(sinkList);
 
-            // 5. Add a single result to the paginated list
+            // Add a single result to the paginated list
             responseList.add(pageInfo);
         }
 
@@ -594,6 +618,23 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         streamFieldMapper.insertAll(list);
     }
 
+    @Transactional(rollbackFor = Throwable.class)
+    void saveOrUpdateExt(String groupId, String streamId, List<InlongStreamExtInfo> exts) {
+        LOGGER.info("begin to save or update inlong stream ext info, groupId={}, streamId={}, ext={}", groupId,
+                streamId, exts);
+        if (CollectionUtils.isEmpty(exts)) {
+            return;
+        }
+
+        List<InlongStreamExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
+        entityList.stream().forEach(streamEntity -> {
+            streamEntity.setInlongGroupId(groupId);
+            streamEntity.setInlongStreamId(streamId);
+        });
+        streamExtMapper.insertOnDuplicateKeyUpdate(entityList);
+        LOGGER.info("success to save or update inlong stream ext for groupId={}", groupId);
+    }
+
     /**
      * Check whether the inlong group status is temporary
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d560e706b..29e01a414 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -57,7 +57,6 @@ import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -366,17 +365,15 @@ public class InlongGroupServiceImpl implements InlongGroupService {
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
-    public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> infoList) {
-        LOGGER.info("begin to save or update inlong group ext info, groupId={}, ext={}", groupId, infoList);
-        if (CollectionUtils.isEmpty(infoList)) {
+    public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> exts) {
+        LOGGER.info("begin to save or update inlong group ext info, groupId={}, ext={}", groupId, exts);
+        if (CollectionUtils.isEmpty(exts)) {
             return;
         }
 
-        List<InlongGroupExtEntity> entityList = CommonBeanUtils.copyListProperties(infoList, InlongGroupExtEntity::new);
-        Date date = new Date();
+        List<InlongGroupExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongGroupExtEntity::new);
         for (InlongGroupExtEntity entity : entityList) {
             entity.setInlongGroupId(groupId);
-            entity.setModifyTime(date);
         }
         groupExtMapper.insertOnDuplicateKeyUpdate(entityList);
         LOGGER.info("success to save or update inlong group ext for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 842ea887d..b14314be8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sort;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
@@ -141,7 +142,10 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
             List<SourceResponse> sourceResponses = sourceService.listSource(groupInfo.getInlongGroupId(),
                     streamInfo.getInlongStreamId());
             for (SourceResponse sourceResponse : sourceResponses) {
-                pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+                if (StringUtils.isEmpty(pulsarSourceResponse.getSerializationType())
+                        && StringUtils.isNotEmpty(sourceResponse.getSerializationType())) {
+                    pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+                }
                 if (SourceType.forType(sourceResponse.getSourceType()) == SourceType.KAFKA) {
                     pulsarSourceResponse.setPrimaryKey(((KafkaSourceResponse) sourceResponse).getPrimaryKey());
                 }
@@ -163,7 +167,8 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         return nodes;
     }
 
-    private List<NodeRelationShip> createNodeRelationShipsForStream(List<SourceResponse> sourceResponses,
+    private List<NodeRelationShip> createNodeRelationShipsForStream(
+            List<SourceResponse> sourceResponses,
             List<SinkResponse> sinkResponses) {
         NodeRelationShip relationShip = new NodeRelationShip();
         List<String> inputs = sourceResponses.stream().map(sourceResponse -> sourceResponse.getSourceName())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 4cba1b7a1..4ce935da3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -20,28 +20,39 @@ package org.apache.inlong.manager.service.sort;
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.service.sort.util.DataFlowUtils;
+import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -53,10 +64,12 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
 
+    @Autowired
+    private StreamSourceService streamSourceService;
     @Autowired
     private StreamSinkService streamSinkService;
     @Autowired
-    private DataFlowUtils dataFlowUtils;
+    private CommonOperateService commonOperateService;
 
     @Override
     public TaskEvent event() {
@@ -70,6 +83,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
         if (groupOperateType == GroupOperateType.SUSPEND || groupOperateType == GroupOperateType.DELETE) {
             return ListenerResult.success();
         }
+        InlongGroupInfo groupInfo = form.getGroupInfo();
         InlongStreamInfo streamInfo = form.getStreamInfo();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
@@ -78,25 +92,23 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
             log.warn("Sink not found by groupId={}", groupId);
             return ListenerResult.success();
         }
-        //TODO must be compile with new protocal of sort
         try {
-            InlongGroupInfo groupInfo = form.getGroupInfo();
-            Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponses.stream().map(sink -> {
-                        DataFlowInfo flowInfo = dataFlowUtils.createDataFlow(groupInfo, sink);
-                        return Pair.of(sink.getInlongStreamId(), flowInfo);
-                    }
-            ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-            String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
-            InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+            List<SourceResponse> sourceResponses = createPulsarSources(groupInfo, streamInfo);
+            List<Node> nodes = createNodesForStream(sourceResponses, sinkResponses);
+            List<NodeRelationShip> nodeRelationShips = createNodeRelationShipsForStream(sourceResponses, sinkResponses);
+            StreamInfo sortStreamInfo = new StreamInfo(streamId, nodes, nodeRelationShips);
+            GroupInfo sortGroupInfo = new GroupInfo(groupId, Lists.newArrayList(sortStreamInfo));
+            String dataFlows = OBJECT_MAPPER.writeValueAsString(sortGroupInfo);
+            InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
             extInfo.setInlongGroupId(groupId);
-            String keyName = InlongGroupSettings.DATA_FLOW + ":" + streamId;
+            extInfo.setInlongStreamId(streamId);
+            String keyName = InlongGroupSettings.DATA_FLOW;
             extInfo.setKeyName(keyName);
             extInfo.setKeyValue(dataFlows);
-            if (groupInfo.getExtList() == null) {
+            if (streamInfo.getExtList() == null) {
                 groupInfo.setExtList(Lists.newArrayList());
             }
-            upsertDataFlow(groupInfo, extInfo, keyName);
+            upsertDataFlow(streamInfo, extInfo, keyName);
         } catch (Exception e) {
             log.error("create sort config failed for sink list={} of groupId={}, streamId={}", sinkResponses, groupId,
                     streamId, e);
@@ -105,9 +117,61 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
         return ListenerResult.success();
     }
 
-    private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo, String keyName) {
-        groupInfo.getExtList().removeIf(ext -> keyName.equals(ext.getKeyName()));
-        groupInfo.getExtList().add(extInfo);
+    private List<SourceResponse> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
+        MQType mqType = MQType.forType(groupInfo.getMqType());
+        if (mqType != MQType.PULSAR) {
+            String errMsg = String.format("Unsupported MqType={} for Inlong", mqType);
+            log.error(errMsg);
+            throw new WorkflowListenerException(errMsg);
+        }
+        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+        PulsarSourceResponse pulsarSourceResponse = new PulsarSourceResponse();
+        pulsarSourceResponse.setSourceName(streamInfo.getInlongStreamId());
+        pulsarSourceResponse.setNamespace(groupInfo.getMqResource());
+        pulsarSourceResponse.setTopic(streamInfo.getMqResource());
+        pulsarSourceResponse.setAdminUrl(pulsarCluster.getAdminUrl());
+        pulsarSourceResponse.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+        List<SourceResponse> sourceResponses = streamSourceService.listSource(groupInfo.getInlongGroupId(),
+                streamInfo.getInlongStreamId());
+        for (SourceResponse sourceResponse : sourceResponses) {
+            if (StringUtils.isEmpty(pulsarSourceResponse.getSerializationType())
+                    && StringUtils.isNotEmpty(sourceResponse.getSerializationType())) {
+                pulsarSourceResponse.setSerializationType(sourceResponse.getSerializationType());
+            }
+            if (SourceType.forType(sourceResponse.getSourceType()) == SourceType.KAFKA) {
+                pulsarSourceResponse.setPrimaryKey(((KafkaSourceResponse) sourceResponse).getPrimaryKey());
+            }
+        }
+        pulsarSourceResponse.setScanStartupMode("earliest");
+        pulsarSourceResponse.setFieldList(streamInfo.getFieldList());
+        return Lists.newArrayList(pulsarSourceResponse);
+    }
+
+    private List<Node> createNodesForStream(
+            List<SourceResponse> sourceResponses,
+            List<SinkResponse> sinkResponses) {
+        List<Node> nodes = Lists.newArrayList();
+        nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceResponses));
+        nodes.addAll(LoadNodeUtils.createLoadNodes(sinkResponses));
+        return nodes;
+    }
+
+    private List<NodeRelationShip> createNodeRelationShipsForStream(
+            List<SourceResponse> sourceResponses,
+            List<SinkResponse> sinkResponses) {
+        NodeRelationShip relationShip = new NodeRelationShip();
+        List<String> inputs = sourceResponses.stream().map(sourceResponse -> sourceResponse.getSourceName())
+                .collect(Collectors.toList());
+        List<String> outputs = sinkResponses.stream().map(sinkResponse -> sinkResponse.getSinkName())
+                .collect(Collectors.toList());
+        relationShip.setInputs(inputs);
+        relationShip.setOutputs(outputs);
+        return Lists.newArrayList(relationShip);
+    }
+
+    private void upsertDataFlow(InlongStreamInfo streamInfo, InlongStreamExtInfo extInfo, String keyName) {
+        streamInfo.getExtList().removeIf(ext -> keyName.equals(ext.getKeyName()));
+        streamInfo.getExtList().add(extInfo);
     }
 
     @Override
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 7552e6e2a..73e564d25 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -347,6 +347,23 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
     UNIQUE KEY `unique_inlong_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`)
 );
 
+-- ----------------------------
+-- Table structure for inlong_stream_ext
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
+(
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `key_name`         varchar(256) NOT NULL COMMENT 'Configuration item name',
+    `key_value`        text COMMENT 'The value of the configuration item',
+    `is_deleted`       int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+    KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+);
+
 -- ----------------------------
 -- Table structure for inlong_stream_field
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index b56efe35d..52adb2658 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -372,6 +372,24 @@ CREATE TABLE IF NOT EXISTS `inlong_stream`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream table';
 
+-- ----------------------------
+-- Table structure for inlong_stream_ext
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_stream_ext`
+(
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `key_name`         varchar(256) NOT NULL COMMENT 'Configuration item name',
+    `key_value`        text COMMENT 'The value of the configuration item',
+    `is_deleted`       int(11)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `stream_key_idx` (`inlong_group_id`, `inlong_stream_id`, `key_name`),
+    KEY `index_stream_id` (`inlong_group_id`, `inlong_stream_id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong stream extension table';
+
 -- ----------------------------
 -- Table structure for inlong_stream_field
 -- ----------------------------