You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/14 08:21:29 UTC
[incubator-inlong] branch master updated: [INLONG-2465][Manager] GetClusterConfig interface support select sinkParams (#2439)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2854756 [INLONG-2465][Manager] GetClusterConfig interface support select sinkParams (#2439)
2854756 is described below
commit 2854756dec0c8101abf0ec2b2fb73897570b25e5
Author: imvan <de...@pku.edu.cn>
AuthorDate: Mon Feb 14 16:21:23 2022 +0800
[INLONG-2465][Manager] GetClusterConfig interface support select sinkParams (#2439)
* Check sinkType before quire sinkParam table
* Update the size of param_value of sink params and id params table to 1K
---
.../dao/entity/SortTaskSinkParamEntity.java | 37 +++++++
.../dao/mapper/SortTaskSinkParamEntityMapper.java | 40 +++++++
.../src/main/resources/generatorConfig.xml | 9 +-
.../mappers/SortTaskSinkParamEntityMapper.xml | 117 +++++++++++++++++++++
.../service/core/SortTaskSinkParamService.java | 34 ++++++
.../manager/service/core/impl/SortServiceImpl.java | 12 ++-
.../core/impl/SortTaskIdParamServiceImpl.java | 20 ++--
.../core/impl/SortTaskSinkParamServiceImpl.java | 53 ++++++++++
.../manager-web/sql/apache_inlong_manager.sql | 24 ++++-
.../web/controller/openapi/SortControllerTest.java | 31 ++++--
10 files changed, 351 insertions(+), 26 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortTaskSinkParamEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortTaskSinkParamEntity.java
new file mode 100644
index 0000000..b298c5f
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/SortTaskSinkParamEntity.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+public class SortTaskSinkParamEntity implements Serializable {
+ private Integer id;
+
+ private String taskName;
+
+ private String sinkType;
+
+ private String paramKey;
+
+ private String paramValue;
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortTaskSinkParamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortTaskSinkParamEntityMapper.java
new file mode 100644
index 0000000..6e80ca0
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/SortTaskSinkParamEntityMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface SortTaskSinkParamEntityMapper {
+ int deleteByPrimaryKey(Integer id);
+
+ int insert(SortTaskSinkParamEntity record);
+
+ int insertSelective(SortTaskSinkParamEntity record);
+
+ SortTaskSinkParamEntity selectByPrimaryKey(Integer id);
+
+ int updateByPrimaryKeySelective(SortTaskSinkParamEntity record);
+
+ int updateByPrimaryKey(SortTaskSinkParamEntity record);
+
+ List<SortTaskSinkParamEntity> selectByTaskNameAndType(String taskName);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index b7d02df..942c638 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -224,7 +224,7 @@
enableUpdateByPrimaryKey="true"
enableDeleteByPrimaryKey="true" enableInsert="true"
enableCountByExample="false" enableDeleteByExample="false"
- enableSelectByExample="false" enableUpdateByExample="false"/>-->
+ enableSelectByExample="false" enableUpdateByExample="false"/>
<table tableName="sort_task_id_param" domainObjectName="SortTaskIdParamEntity"
enableSelectByPrimaryKey="true"
@@ -232,6 +232,13 @@
enableDeleteByPrimaryKey="true" enableInsert="true"
enableCountByExample="false" enableDeleteByExample="false"
enableSelectByExample="false" enableUpdateByExample="false"/>
+
+ <table tableName="sort_task_sink_param" domainObjectName="SortTaskSinkParamEntity"
+ enableSelectByPrimaryKey="true"
+ enableUpdateByPrimaryKey="true"
+ enableDeleteByPrimaryKey="true" enableInsert="true"
+ enableCountByExample="false" enableDeleteByExample="false"
+ enableSelectByExample="false" enableUpdateByExample="false"/>-->
</context>
</generatorConfiguration>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/SortTaskSinkParamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/SortTaskSinkParamEntityMapper.xml
new file mode 100644
index 0000000..ce8f1a7
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/SortTaskSinkParamEntityMapper.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="task_name" jdbcType="VARCHAR" property="taskName" />
+ <result column="sink_type" jdbcType="VARCHAR" property="sinkType" />
+ <result column="param_key" jdbcType="VARCHAR" property="paramKey" />
+ <result column="param_value" jdbcType="VARCHAR" property="paramValue" />
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, task_name, sink_type, param_key, param_value
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List" />
+ from sort_task_sink_param
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete from sort_task_sink_param
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+ insert into sort_task_sink_param (id, task_name, sink_type,
+ param_key, param_value)
+ values (#{id,jdbcType=INTEGER}, #{taskName,jdbcType=VARCHAR}, #{sinkType,jdbcType=VARCHAR},
+ #{paramKey,jdbcType=VARCHAR}, #{paramValue,jdbcType=VARCHAR})
+ </insert>
+ <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+ insert into sort_task_sink_param
+ <trim prefix="(" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ id,
+ </if>
+ <if test="taskName != null">
+ task_name,
+ </if>
+ <if test="sinkType != null">
+ sink_type,
+ </if>
+ <if test="paramKey != null">
+ param_key,
+ </if>
+ <if test="paramValue != null">
+ param_value,
+ </if>
+ </trim>
+ <trim prefix="values (" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ #{id,jdbcType=INTEGER},
+ </if>
+ <if test="taskName != null">
+ #{taskName,jdbcType=VARCHAR},
+ </if>
+ <if test="sinkType != null">
+ #{sinkType,jdbcType=VARCHAR},
+ </if>
+ <if test="paramKey != null">
+ #{paramKey,jdbcType=VARCHAR},
+ </if>
+ <if test="paramValue != null">
+ #{paramValue,jdbcType=VARCHAR},
+ </if>
+ </trim>
+ </insert>
+ <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+ update sort_task_sink_param
+ <set>
+ <if test="taskName != null">
+ task_name = #{taskName,jdbcType=VARCHAR},
+ </if>
+ <if test="sinkType != null">
+ sink_type = #{sinkType,jdbcType=VARCHAR},
+ </if>
+ <if test="paramKey != null">
+ param_key = #{paramKey,jdbcType=VARCHAR},
+ </if>
+ <if test="paramValue != null">
+ param_value = #{paramValue,jdbcType=VARCHAR},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity">
+ update sort_task_sink_param
+ set task_name = #{taskName,jdbcType=VARCHAR},
+ sink_type = #{sinkType,jdbcType=VARCHAR},
+ param_key = #{paramKey,jdbcType=VARCHAR},
+ param_value = #{paramValue,jdbcType=VARCHAR}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <select id="selectByTaskNameAndType" parameterType="java.lang.String" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List" />
+ from sort_task_sink_param
+ where task_name = #{taskName,jdbcType=VARCHAR}
+ </select>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
new file mode 100644
index 0000000..6526d65
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import java.util.Map;
+
+/**
+ * Sort sink params service.
+ */
+public interface SortTaskSinkParamService {
+
+ /**
+ * Select all sink params by task name and sink type.
+ * @param taskName Name of task;
+ * @param sinkType Type of sink;
+ * @return
+ */
+ Map<String, String> selectByTaskNameAndType(String taskName, String sinkType);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 3bea2dc..852763d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.pojo.sort.SortClusterConfigResponse.Sort
import org.apache.inlong.manager.service.core.SortClusterConfigService;
import org.apache.inlong.manager.service.core.SortTaskIdParamService;
import org.apache.inlong.manager.service.core.SortService;
+import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +45,8 @@ public class SortServiceImpl implements SortService {
@Autowired private SortTaskIdParamService sortTaskIdParamService;
+ @Autowired private SortTaskSinkParamService sortTaskSinkParamService;
+
@Override
public SortClusterConfigResponse getClusterConfig(String clusterName, String md5) {
LOGGER.info("start getClusterConfig");
@@ -78,14 +81,17 @@ public class SortServiceImpl implements SortService {
}
private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
+ SinkType sinkType = SinkType.valueOf(clusterConfig.getSinkType().toUpperCase());
List<Map<String, String>> idParams =
sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
- // TODO add method that get sink params
+ Map<String, String> sinkParams =
+ sortTaskSinkParamService
+ .selectByTaskNameAndType(clusterConfig.getTaskName(), clusterConfig.getSinkType());
return SortTaskConfig.builder()
.taskName(clusterConfig.getTaskName())
- .sinkType(SinkType.valueOf(clusterConfig.getSinkType().toUpperCase()))
+ .sinkType(sinkType)
.idParams(idParams)
- .sinkParams(null)
+ .sinkParams(sinkParams)
.build();
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
index eab99c6..27723c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
@@ -38,22 +38,18 @@ public class SortTaskIdParamServiceImpl implements SortTaskIdParamService {
@Autowired private SortTaskIdParamEntityMapper sortTaskIdParamEntityMapper;
- private final Map<String, Map<String, String>> idParams = new HashMap<>();
-
@Override
public List<Map<String, String>> selectByTaskName(String taskName) {
List<SortTaskIdParamEntity> taskIdParamEntityList =
sortTaskIdParamEntityMapper.selectByTaskName(taskName);
- idParams.clear();
- taskIdParamEntityList.forEach(this::addParam);
+ Map<String, Map<String, String>> idParams = new HashMap<>();
+ taskIdParamEntityList.forEach(entity -> {
+ Map<String, String> idParam =
+ idParams.computeIfAbsent(entity.getKey(), key -> new HashMap<>());
+ idParam.put(entity.getParamKey(), entity.getParamValue());
+ idParam.putIfAbsent(KEY_GROUP_ID, entity.getGroupId());
+ idParam.putIfAbsent(KEY_STREAM_ID, entity.getStreamId());
+ });
return new ArrayList<>(idParams.values());
}
-
- private void addParam(SortTaskIdParamEntity entity) {
- Map<String, String> idParam =
- idParams.computeIfAbsent(entity.getKey(), key -> new HashMap<>());
- idParam.put(entity.getParamKey(), entity.getParamValue());
- idParam.putIfAbsent(KEY_GROUP_ID, entity.getGroupId());
- idParam.putIfAbsent(KEY_STREAM_ID, entity.getStreamId());
- }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
new file mode 100644
index 0000000..39b7e8a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
+import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
+import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Sort sink params service implementation. */
+@Service
+public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SortTaskSinkParamServiceImpl.class);
+
+ private static final String KEY_SINK_TYPE = "type";
+
+ @Autowired
+ private SortTaskSinkParamEntityMapper sortTaskSinkParamEntityMapper;
+
+ @Override
+ public Map<String, String> selectByTaskNameAndType(String taskName, String sinkType) {
+ LOGGER.info("task name is {}, sink type is {}", taskName, sinkType);
+ List<SortTaskSinkParamEntity> taskSinkParamEntityList =
+ sortTaskSinkParamEntityMapper.selectByTaskNameAndType(taskName);
+ Map<String, String> sinkParams = new HashMap<>();
+ sinkParams.put(KEY_SINK_TYPE, sinkType);
+ taskSinkParamEntityList.forEach(entity -> sinkParams.put(entity.getParamKey(), entity.getParamValue()));
+ return sinkParams;
+ }
+
+}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index eb3ce43..5c11a74 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1145,7 +1145,8 @@ CREATE TABLE `sort_cluster_config`
`cluster_name` varchar(128) NOT NULL COMMENT 'Cluster name',
`task_name` varchar(128) NOT NULL COMMENT 'Task name',
`sink_type` varchar(128) NOT NULL COMMENT 'Type of sink',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_sort_cluster_config` (`cluster_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort cluster config table';
@@ -1160,9 +1161,26 @@ CREATE TABLE `sort_task_id_param`
`group_id` varchar(128) NOT NULL COMMENT 'Inlong group id',
`stream_id` varchar(128) NULL COMMENT 'Inlong stream id',
`param_key` varchar(128) NOT NULL COMMENT 'Key of param',
- `param_value` varchar(128) NOT NULL COMMENT 'Value of param',
- PRIMARY KEY (`id`)
+ `param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
+ PRIMARY KEY (`id`),
+ KEY `index_sort_task_id_param` (`task_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task id params table';
+-- ----------------------------
+-- Table structure for sort_task_sink_param
+-- ----------------------------
+DROP TABLE IF EXISTS `sort_task_sink_param`;
+CREATE TABLE `sort_task_sink_param`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `task_name` varchar(128) NOT NULL COMMENT 'Task name',
+ `sink_type` varchar(128) NOT NULL COMMENT 'Type of sink',
+ `param_key` varchar(128) NOT NULL COMMENT 'Key of param',
+ `param_value` varchar(1024) NOT NULL COMMENT 'Value of param',
+ PRIMARY KEY (`id`),
+ KEY `index_sort_task_sink_params` (`task_name`, `sink_type`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Sort task sink params table';
+
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
index dddd183..e5ab2a6 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
@@ -17,14 +17,12 @@
package org.apache.inlong.manager.web.controller.openapi;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
+import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -37,6 +35,10 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.context.WebApplicationContext;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
@RunWith(SpringRunner.class)
@SpringBootTest
public class SortControllerTest {
@@ -48,6 +50,8 @@ public class SortControllerTest {
// todo Service do not support insert method now, use mappers to insert data.
@Autowired private SortTaskIdParamEntityMapper taskIdParamEntityMapper;
+ @Autowired private SortTaskSinkParamEntityMapper taskSinkParamEntityMapper;
+
@Autowired private SortClusterConfgiEntityMapper sortClusterConfgiEntityMapper;
@Before
@@ -56,8 +60,12 @@ public class SortControllerTest {
taskIdParamEntityMapper.insert(this.prepareIdParamsEntity("testTask1", 1));
taskIdParamEntityMapper.insert(this.prepareIdParamsEntity("testTask1", 2));
taskIdParamEntityMapper.insert(this.prepareIdParamsEntity("testTask2", 1));
+ taskSinkParamEntityMapper
+ .insert(this.prepareSinkParamsEntity("testTask1", "kafka", 1));
+ taskSinkParamEntityMapper
+ .insert(this.prepareSinkParamsEntity("testTask2", "pulsar", 1));
sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask1", "kafka"));
- sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask2", "kafka"));
+ sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask2", "pulsar"));
}
/**
@@ -102,8 +110,8 @@ public class SortControllerTest {
.groupId(String.valueOf(idx))
.streamId(String.valueOf(idx))
.taskName(task)
- .paramKey("paramKey" + idx)
- .paramValue("paramValue" + idx)
+ .paramKey("idParamKey " + idx)
+ .paramValue("idParamValue " + idx)
.build();
}
@@ -114,4 +122,13 @@ public class SortControllerTest {
.sinkType(sinkType)
.build();
}
+
+ private SortTaskSinkParamEntity prepareSinkParamsEntity(String task, String sinkType, int idx) {
+ return SortTaskSinkParamEntity.builder()
+ .sinkType(sinkType)
+ .paramKey("sinkParamKey " + idx)
+ .paramValue("sinkParamValue " + idx)
+ .taskName(task)
+ .build();
+ }
}