You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/14 08:21:29 UTC

[incubator-inlong] branch master updated: [INLONG-2465][Manager] GetClusterConfig interface support select sinkParams (#2439)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2854756  [INLONG-2465][Manager] GetClusterConfig interface support select sinkParams (#2439)
2854756 is described below

commit 2854756dec0c8101abf0ec2b2fb73897570b25e5
Author: imvan <de...@pku.edu.cn>
AuthorDate: Mon Feb 14 16:21:23 2022 +0800

    [INLONG-2465][Manager] GetClusterConfig interface support select sinkParams (#2439)
    
    * Check sinkType before quire sinkParam table
    
    * Update the size of param_value  of sink params and id params table to 1K
---
 .../dao/entity/SortTaskSinkParamEntity.java        |  37 +++++++
 .../dao/mapper/SortTaskSinkParamEntityMapper.java  |  40 +++++++
 .../src/main/resources/generatorConfig.xml         |   9 +-
 .../mappers/SortTaskSinkParamEntityMapper.xml      | 117 +++++++++++++++++++++
 .../service/core/SortTaskSinkParamService.java     |  34 ++++++
 .../manager/service/core/impl/SortServiceImpl.java |  12 ++-
 .../core/impl/SortTaskIdParamServiceImpl.java      |  20 ++--
 .../core/impl/SortTaskSinkParamServiceImpl.java    |  53 ++++++++++
 .../manager-web/sql/apache_inlong_manager.sql      |  24 ++++-
 .../web/controller/openapi/SortControllerTest.java |  31 ++++--
 10 files changed, 351 insertions(+), 26 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortTaskSinkParamEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortTaskSinkParamEntity.java
new file mode 100644
index 0000000..b298c5f
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortTaskSinkParamEntity.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+public class SortTaskSinkParamEntity implements Serializable {
+    private Integer id;
+
+    private String taskName;
+
+    private String sinkType;
+
+    private String paramKey;
+
+    private String paramValue;
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortTaskSinkParamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortTaskSinkParamEntityMapper.java
new file mode 100644
index 0000000..6e80ca0
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortTaskSinkParamEntityMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface SortTaskSinkParamEntityMapper {
+    int deleteByPrimaryKey(Integer id);
+
+    int insert(SortTaskSinkParamEntity record);
+
+    int insertSelective(SortTaskSinkParamEntity record);
+
+    SortTaskSinkParamEntity selectByPrimaryKey(Integer id);
+
+    int updateByPrimaryKeySelective(SortTaskSinkParamEntity record);
+
+    int updateByPrimaryKey(SortTaskSinkParamEntity record);
+
+    List<SortTaskSinkParamEntity> selectByTaskNameAndType(String taskName);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index b7d02df..942c638 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -224,7 +224,7 @@
                enableUpdateByPrimaryKey="true"
                enableDeleteByPrimaryKey="true" enableInsert="true"
                enableCountByExample="false" enableDeleteByExample="false"
-               enableSelectByExample="false" enableUpdateByExample="false"/>-->
+               enableSelectByExample="false" enableUpdateByExample="false"/>
 
         <table tableName="sort_task_id_param" domainObjectName="SortTaskIdParamEntity"
                enableSelectByPrimaryKey="true"
@@ -232,6 +232,13 @@
                enableDeleteByPrimaryKey="true" enableInsert="true"
                enableCountByExample="false" enableDeleteByExample="false"
                enableSelectByExample="false" enableUpdateByExample="false"/>
+               
+        <table tableName="sort_task_sink_param" domainObjectName="SortTaskSinkParamEntity"
+               enableSelectByPrimaryKey="true"
+               enableUpdateByPrimaryKey="true"
+               enableDeleteByPrimaryKey="true" enableInsert="true"
+               enableCountByExample="false" enableDeleteByExample="false"
+               enableSelectByExample="false" enableUpdateByExample="false"/>-->
 
     </context>
 </generatorConfiguration>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/SortTaskSinkParamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/SortTaskSinkParamEntityMapper.xml
new file mode 100644
index 0000000..ce8f1a7
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/SortTaskSinkParamEntityMapper.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+    <id column="id" jdbcType="INTEGER" property="id" />
+    <result column="task_name" jdbcType="VARCHAR" property="taskName" />
+    <result column="sink_type" jdbcType="VARCHAR" property="sinkType" />
+    <result column="param_key" jdbcType="VARCHAR" property="paramKey" />
+    <result column="param_value" jdbcType="VARCHAR" property="paramValue" />
+  </resultMap>
+  <sql id="Base_Column_List">
+    id, task_name, sink_type, param_key, param_value
+  </sql>
+  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+    select 
+    <include refid="Base_Column_List" />
+    from sort_task_sink_param
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    delete from sort_task_sink_param
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+    insert into sort_task_sink_param (id, task_name, sink_type, 
+      param_key, param_value)
+    values (#{id,jdbcType=INTEGER}, #{taskName,jdbcType=VARCHAR}, #{sinkType,jdbcType=VARCHAR}, 
+      #{paramKey,jdbcType=VARCHAR}, #{paramValue,jdbcType=VARCHAR})
+  </insert>
+  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+    insert into sort_task_sink_param
+    <trim prefix="(" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        id,
+      </if>
+      <if test="taskName != null">
+        task_name,
+      </if>
+      <if test="sinkType != null">
+        sink_type,
+      </if>
+      <if test="paramKey != null">
+        param_key,
+      </if>
+      <if test="paramValue != null">
+        param_value,
+      </if>
+    </trim>
+    <trim prefix="values (" suffix=")" suffixOverrides=",">
+      <if test="id != null">
+        #{id,jdbcType=INTEGER},
+      </if>
+      <if test="taskName != null">
+        #{taskName,jdbcType=VARCHAR},
+      </if>
+      <if test="sinkType != null">
+        #{sinkType,jdbcType=VARCHAR},
+      </if>
+      <if test="paramKey != null">
+        #{paramKey,jdbcType=VARCHAR},
+      </if>
+      <if test="paramValue != null">
+        #{paramValue,jdbcType=VARCHAR},
+      </if>
+    </trim>
+  </insert>
+  <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+    update sort_task_sink_param
+    <set>
+      <if test="taskName != null">
+        task_name = #{taskName,jdbcType=VARCHAR},
+      </if>
+      <if test="sinkType != null">
+        sink_type = #{sinkType,jdbcType=VARCHAR},
+      </if>
+      <if test="paramKey != null">
+        param_key = #{paramKey,jdbcType=VARCHAR},
+      </if>
+      <if test="paramValue != null">
+        param_value = #{paramValue,jdbcType=VARCHAR},
+      </if>
+    </set>
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+  <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+    update sort_task_sink_param
+    set task_name = #{taskName,jdbcType=VARCHAR},
+      sink_type = #{sinkType,jdbcType=VARCHAR},
+      param_key = #{paramKey,jdbcType=VARCHAR},
+      param_value = #{paramValue,jdbcType=VARCHAR}
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+  <select id="selectByTaskNameAndType" parameterType="java.lang.String" resultMap="BaseResultMap">
+    select
+    <include refid="Base_Column_List" />
+    from sort_task_sink_param
+    where task_name = #{taskName,jdbcType=VARCHAR}
+  </select>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
new file mode 100644
index 0000000..6526d65
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import java.util.Map;
+
+/**
+ * Sort sink params service.
+ */
+public interface SortTaskSinkParamService {
+
+    /**
+     * Select all sink params by task name and sink type.
+     * @param taskName Name of task;
+     * @param sinkType Type of sink;
+     * @return
+     */
+    Map<String, String> selectByTaskNameAndType(String taskName, String sinkType);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 3bea2dc..852763d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.Sort
 import org.apache.inlong.manager.service.core.SortClusterConfigService;
 import org.apache.inlong.manager.service.core.SortTaskIdParamService;
 import org.apache.inlong.manager.service.core.SortService;
+import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +45,8 @@ public class SortServiceImpl implements SortService {
 
     @Autowired private SortTaskIdParamService sortTaskIdParamService;
 
+    @Autowired private SortTaskSinkParamService sortTaskSinkParamService;
+
     @Override
     public SortClusterConfigResponse getClusterConfig(String clusterName, String md5) {
         LOGGER.info("start getClusterConfig");
@@ -78,14 +81,17 @@ public class SortServiceImpl implements SortService {
     }
 
     private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
+        SinkType sinkType = SinkType.valueOf(clusterConfig.getSinkType().toUpperCase());
         List<Map<String, String>> idParams =
                 sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
-        // TODO add method that get sink params
+        Map<String, String> sinkParams =
+                sortTaskSinkParamService
+                        .selectByTaskNameAndType(clusterConfig.getTaskName(), clusterConfig.getSinkType());
         return SortTaskConfig.builder()
                 .taskName(clusterConfig.getTaskName())
-                .sinkType(SinkType.valueOf(clusterConfig.getSinkType().toUpperCase()))
+                .sinkType(sinkType)
                 .idParams(idParams)
-                .sinkParams(null)
+                .sinkParams(sinkParams)
                 .build();
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
index eab99c6..27723c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
@@ -38,22 +38,18 @@ public class SortTaskIdParamServiceImpl implements SortTaskIdParamService {
 
     @Autowired private SortTaskIdParamEntityMapper sortTaskIdParamEntityMapper;
 
-    private final Map<String, Map<String, String>> idParams = new HashMap<>();
-
     @Override
     public List<Map<String, String>> selectByTaskName(String taskName) {
         List<SortTaskIdParamEntity> taskIdParamEntityList =
                 sortTaskIdParamEntityMapper.selectByTaskName(taskName);
-        idParams.clear();
-        taskIdParamEntityList.forEach(this::addParam);
+        Map<String, Map<String, String>> idParams = new HashMap<>();
+        taskIdParamEntityList.forEach(entity -> {
+            Map<String, String> idParam =
+                    idParams.computeIfAbsent(entity.getKey(), key -> new HashMap<>());
+            idParam.put(entity.getParamKey(), entity.getParamValue());
+            idParam.putIfAbsent(KEY_GROUP_ID, entity.getGroupId());
+            idParam.putIfAbsent(KEY_STREAM_ID, entity.getStreamId());
+        });
         return new ArrayList<>(idParams.values());
     }
-
-    private void addParam(SortTaskIdParamEntity entity) {
-        Map<String, String> idParam =
-                idParams.computeIfAbsent(entity.getKey(), key -> new HashMap<>());
-        idParam.put(entity.getParamKey(), entity.getParamValue());
-        idParam.putIfAbsent(KEY_GROUP_ID, entity.getGroupId());
-        idParam.putIfAbsent(KEY_STREAM_ID, entity.getStreamId());
-    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
new file mode 100644
index 0000000..39b7e8a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
+import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
+import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Sort sink params service implementation. */
+@Service
+public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SortTaskSinkParamServiceImpl.class);
+
+    private static final String KEY_SINK_TYPE = "type";
+
+    @Autowired
+    private SortTaskSinkParamEntityMapper sortTaskSinkParamEntityMapper;
+
+    @Override
+    public Map<String, String> selectByTaskNameAndType(String taskName, String sinkType) {
+        LOGGER.info("task name is {}, sink type is {}", taskName, sinkType);
+        List<SortTaskSinkParamEntity> taskSinkParamEntityList =
+                sortTaskSinkParamEntityMapper.selectByTaskNameAndType(taskName);
+        Map<String, String> sinkParams = new HashMap<>();
+        sinkParams.put(KEY_SINK_TYPE, sinkType);
+        taskSinkParamEntityList.forEach(entity -> sinkParams.put(entity.getParamKey(), entity.getParamValue()));
+        return sinkParams;
+    }
+
+}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index eb3ce43..5c11a74 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1145,7 +1145,8 @@ CREATE TABLE `sort_cluster_config`
     `cluster_name`  varchar(128)  NOT NULL COMMENT 'Cluster name',
     `task_name`     varchar(128)  NOT NULL COMMENT 'Task name',
     `sink_type`     varchar(128)  NOT NULL COMMENT 'Type of sink',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_sort_cluster_config` (`cluster_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort cluster config table';
 
@@ -1160,9 +1161,26 @@ CREATE TABLE `sort_task_id_param`
     `group_id`         varchar(128)  NOT NULL COMMENT 'Inlong group id',
     `stream_id`        varchar(128)  NULL COMMENT 'Inlong stream id',
     `param_key`        varchar(128)  NOT NULL COMMENT 'Key of param',
-    `param_value`      varchar(128)  NOT NULL COMMENT 'Value of param',
-    PRIMARY KEY (`id`)
+    `param_value`      varchar(1024)  NOT NULL COMMENT 'Value of param',
+    PRIMARY KEY (`id`),
+    KEY `index_sort_task_id_param` (`task_name`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task id params table';
 
+-- ----------------------------
+-- Table structure for sort_task_sink_param
+-- ----------------------------
+DROP TABLE IF EXISTS `sort_task_sink_param`;
+CREATE TABLE `sort_task_sink_param`
+(
+    `id`               int(11)       NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `task_name`        varchar(128)  NOT NULL COMMENT 'Task name',
+    `sink_type`        varchar(128)  NOT NULL COMMENT 'Type of sink',
+    `param_key`        varchar(128)  NOT NULL COMMENT 'Key of param',
+    `param_value`      varchar(1024)  NOT NULL COMMENT 'Value of param',
+    PRIMARY KEY (`id`),
+    KEY `index_sort_task_sink_params` (`task_name`, `sink_type`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task sink params table';
+
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
index dddd183..e5ab2a6 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
@@ -17,14 +17,12 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
 import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
 import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
+import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
 import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,6 +35,10 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
 @RunWith(SpringRunner.class)
 @SpringBootTest
 public class SortControllerTest {
@@ -48,6 +50,8 @@ public class SortControllerTest {
     // todo Service do not support insert method now, use mappers to insert data.
     @Autowired private SortTaskIdParamEntityMapper taskIdParamEntityMapper;
 
+    @Autowired private SortTaskSinkParamEntityMapper taskSinkParamEntityMapper;
+
     @Autowired private SortClusterConfgiEntityMapper sortClusterConfgiEntityMapper;
 
     @Before
@@ -56,8 +60,12 @@ public class SortControllerTest {
         taskIdParamEntityMapper.insert(this.prepareIdParamsEntity("testTask1", 1));
         taskIdParamEntityMapper.insert(this.prepareIdParamsEntity("testTask1", 2));
         taskIdParamEntityMapper.insert(this.prepareIdParamsEntity("testTask2", 1));
+        taskSinkParamEntityMapper
+                .insert(this.prepareSinkParamsEntity("testTask1", "kafka", 1));
+        taskSinkParamEntityMapper
+                .insert(this.prepareSinkParamsEntity("testTask2", "pulsar", 1));
         sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask1", "kafka"));
-        sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask2", "kafka"));
+        sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask2", "pulsar"));
     }
 
     /**
@@ -102,8 +110,8 @@ public class SortControllerTest {
                 .groupId(String.valueOf(idx))
                 .streamId(String.valueOf(idx))
                 .taskName(task)
-                .paramKey("paramKey" + idx)
-                .paramValue("paramValue" + idx)
+                .paramKey("idParamKey " + idx)
+                .paramValue("idParamValue " + idx)
                 .build();
     }
 
@@ -114,4 +122,13 @@ public class SortControllerTest {
                 .sinkType(sinkType)
                 .build();
     }
+
+    private SortTaskSinkParamEntity prepareSinkParamsEntity(String task, String sinkType, int idx) {
+        return SortTaskSinkParamEntity.builder()
+                .sinkType(sinkType)
+                .paramKey("sinkParamKey " + idx)
+                .paramValue("sinkParamValue " + idx)
+                .taskName(task)
+                .build();
+    }
 }