You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/06 09:13:42 UTC
[inlong] branch master updated: [INLONG-4177][Manager] Refactor getClusterConfig API for Sort Standalone (#4200)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 44d007c31 [INLONG-4177][Manager] Refactor getClusterConfig API for Sort Standalone (#4200)
44d007c31 is described below
commit 44d007c310aee60ca6ff8f5090457ab16b4fa783
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Jul 6 17:13:36 2022 +0800
[INLONG-4177][Manager] Refactor getClusterConfig API for Sort Standalone (#4200)
---
.../common/pojo/sortstandalone/SortIdInfo.java} | 24 +-
.../common/pojo/sortstandalone/SortSinkInfo.java} | 24 +-
.../common/pojo/sortstandalone/SortTaskInfo.java} | 25 +-
.../manager/dao/mapper/DataNodeEntityMapper.java | 2 +
.../manager/dao/mapper/StreamSinkEntityMapper.java | 16 ++
.../resources/mappers/DataNodeEntityMapper.xml | 10 +
.../resources/mappers/StreamSinkEntityMapper.xml | 23 ++
...IdParamService.java => SortClusterService.java} | 16 +-
.../core/impl/SortClusterConfigServiceImpl.java | 41 ---
.../service/core/impl/SortClusterServiceImpl.java | 309 +++++++++++++++++++++
.../manager/service/core/impl/SortServiceImpl.java | 88 +-----
.../core/impl/SortTaskIdParamServiceImpl.java | 57 ----
.../core/impl/SortTaskSinkParamServiceImpl.java | 52 ----
.../web/controller/openapi/SortControllerTest.java | 30 +-
14 files changed, 400 insertions(+), 317 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortIdInfo.java
similarity index 65%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortIdInfo.java
index 9cb601c83..2cbb26593 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskSinkParamService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortIdInfo.java
@@ -15,21 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.pojo.sortstandalone;
-import java.util.Map;
+import lombok.Data;
-/**
- * Sort sink params service.
- */
-public interface SortTaskSinkParamService {
-
- /**
- * Select all sink params by task name and sink type.
- *
- * @param taskName Name of task;
- * @param sinkType Type of sink;
- * @return map of sink params.
- */
- Map<String, String> selectByTaskNameAndType(String taskName, String sinkType);
+@Data
+public class SortIdInfo {
+ private static final long serialVersionUID = 1L;
+ String sortTaskName;
+ String inlongGroupId;
+ String inlongStreamId;
+ String extParams;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortSinkInfo.java
similarity index 62%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortSinkInfo.java
index 88a783b50..38ce0f7c0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterConfigService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortSinkInfo.java
@@ -15,22 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.pojo.sortstandalone;
-import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
+import lombok.Data;
-import java.util.List;
-
-/**
- * Sort cluster config service.
- */
-public interface SortClusterConfigService {
-
- /**
- * Select list of task by cluster name.
- *
- * @param clusterName Name of sort cluster.
- * @return List of tasks, including task name and sink type.
- */
- List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName);
+@Data
+public class SortSinkInfo {
+ private static final long serialVersionUID = 1L;
+ String name;
+ String type;
+ String extParams;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortTaskInfo.java
similarity index 67%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortTaskInfo.java
index 12e3e271f..19da29a2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sortstandalone/SortTaskInfo.java
@@ -15,21 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.common.pojo.sortstandalone;
-import java.util.List;
-import java.util.Map;
+import lombok.Data;
-/**
- * Sort id params service.
- */
-public interface SortTaskIdParamService {
-
- /**
- * Select all id params by task name.
- *
- * @param taskName WorkflowTask name.
- * @return List of all id params.
- */
- List<Map<String, String>> selectByTaskName(String taskName);
+@Data
+public class SortTaskInfo {
+ private static final long serialVersionUID = 1L;
+ String sortClusterName;
+ String sortTaskName;
+ String sortConsumerGroup;
+ String sinkType;
+ String dataNodeName;
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
index 583a34125..70f05c7d0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortSinkInfo;
import org.apache.inlong.manager.common.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.springframework.stereotype.Repository;
@@ -41,4 +42,5 @@ public interface DataNodeEntityMapper {
int deleteById(Integer id);
+ List<SortSinkInfo> selectAllSinkParams();
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index b6f88b833..ed22bdad4 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -18,6 +18,8 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortIdInfo;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortTaskInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
@@ -113,4 +115,18 @@ public interface StreamSinkEntityMapper {
int deleteByPrimaryKey(Integer id);
+ /**
+ * Select all tasks for sort-standalone
+ *
+ * @return All tasks
+ */
+ List<SortTaskInfo> selectAllTasks();
+
+ /**
+ * Select all id params for sort-standalone
+ *
+ * @return All id params
+ */
+ List<SortIdInfo> selectAllIdParams();
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
index 20109b878..2757aa0fe 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
@@ -163,4 +163,14 @@
from data_node
where id = #{id,jdbcType=INTEGER}
</delete>
+
+ <select id="selectAllSinkParams" resultType="org.apache.inlong.manager.common.pojo.sortstandalone.SortSinkInfo">
+ select name,
+ type,
+ ext_params
+ from data_node
+ <where>
+ is_deleted = 0
+ </where>
+ </select>
</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 3c9e6ce54..d387ee774 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -457,5 +457,28 @@
modify_time = now()
where id = #{id,jdbcType=INTEGER}
</update>
+ <select id="selectAllTasks" resultType="org.apache.inlong.manager.common.pojo.sortstandalone.SortTaskInfo">
+ select inlong_cluster_name as sortClusterName,
+ sort_task_name,
+ sort_consumer_group,
+ sink_type,
+ data_node_name
+ from stream_sink
+ <where>
+ is_deleted = 0
+ </where>
+ group by
+ inlong_cluster_name, sort_task_name, sort_consumer_group, sink_type, data_node_name
+ </select>
+ <select id="selectAllIdParams" resultType="org.apache.inlong.manager.common.pojo.sortstandalone.SortIdInfo">
+ select sort_task_name,
+ inlong_group_id,
+ inlong_stream_id,
+ ext_params
+ from stream_sink
+ <where>
+ is_deleted = 0
+ </where>
+ </select>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterService.java
similarity index 69%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterService.java
index 12e3e271f..dd98398bb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortTaskIdParamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortClusterService.java
@@ -17,19 +17,19 @@
package org.apache.inlong.manager.service.core;
-import java.util.List;
-import java.util.Map;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
/**
- * Sort id params service.
+ * Sort cluster config interface.
*/
-public interface SortTaskIdParamService {
+public interface SortClusterService {
/**
- * Select all id params by task name.
+ * Get the cluster config response by specific cluster name.
*
- * @param taskName WorkflowTask name.
- * @return List of all id params.
+ * @param clusterName Cluster name.
+ * @param md5 Last md5.
+ * @return Corresponding response.
*/
- List<Map<String, String>> selectByTaskName(String taskName);
+ SortClusterResponse getClusterConfig(String clusterName, String md5);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
deleted file mode 100644
index 51f9dc3ed..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterConfigServiceImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
-import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
-import org.apache.inlong.manager.service.core.SortClusterConfigService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-
-/**
- * Sort cluster config service implementation.
- */
-@Service
-public class SortClusterConfigServiceImpl implements SortClusterConfigService {
-
- @Autowired
- private SortClusterConfgiEntityMapper sortClusterConfgiEntityMapper;
-
- @Override
- public List<SortClusterConfigEntity> selectTasksByClusterName(String clusterName) {
- return sortClusterConfgiEntityMapper.selectTasksByClusterName(clusterName);
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
new file mode 100644
index 000000000..4b8bf2f04
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import com.google.gson.Gson;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortIdInfo;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortSinkInfo;
+import org.apache.inlong.manager.common.pojo.sortstandalone.SortTaskInfo;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.service.core.SortClusterService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Use to cache the sort cluster config and reduce the number of query to database.
+ */
+@Service
+public class SortClusterServiceImpl implements SortClusterService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterServiceImpl.class);
+
+ private static final Gson gson = new Gson();
+
+ public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000;
+
+ private static final int RESPONSE_CODE_SUCCESS = 0;
+ private static final int RESPONSE_CODE_NO_UPDATE = 1;
+ private static final int RESPONSE_CODE_FAIL = -1;
+ private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
+
+ private static final String KEY_GROUP_ID = "inlongGroupId";
+ private static final String KEY_STREAM_ID = "inlongStreamId";
+
+ // key : sort cluster name, value : md5
+ private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
+ // key : sort cluster name, value : cluster config
+ private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap<>();
+ // key : sort cluster name, value : error log
+ private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap<>();
+
+ private long reloadInterval;
+
+ @Autowired
+ private StreamSinkEntityMapper streamSinkEntityMapper;
+ @Autowired
+ private DataNodeEntityMapper dataNodeEntityMapper;
+
+ @PostConstruct
+ public void initialize() {
+ LOGGER.info("create repository for " + SortClusterServiceImpl.class.getSimpleName());
+ try {
+ this.reloadInterval = DEFAULT_HEARTBEAT_INTERVAL_MS;
+ reload();
+ setReloadTimer();
+ } catch (Throwable t) {
+ LOGGER.error("Initialize SortClusterConfigRepository error", t);
+ }
+ }
+
+ @Transactional(rollbackFor = Exception.class)
+ public void reload() {
+ LOGGER.debug("start to reload sort config.");
+ try {
+ reloadAllClusterConfig();
+ } catch (Throwable t) {
+ LOGGER.error(t.getMessage(), t);
+ }
+ LOGGER.debug("end to reload config");
+ }
+
+ @Override
+ public SortClusterResponse getClusterConfig(String clusterName, String md5) {
+ // check if cluster name is valid or not.
+ if (StringUtils.isBlank(clusterName)) {
+ String errMsg = "Blank cluster name, return nothing";
+ LOGGER.info(errMsg);
+ return SortClusterResponse.builder()
+ .msg(errMsg)
+ .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
+ .build();
+ }
+
+ // if there is an error
+ if (sortClusterErrorLogMap.get(clusterName) != null) {
+ return SortClusterResponse.builder()
+ .msg(sortClusterErrorLogMap.get(clusterName))
+ .code(RESPONSE_CODE_FAIL)
+ .build();
+ }
+
+ // there is no config
+ if (sortClusterConfigMap.get(clusterName) == null) {
+ String errMsg = "There is not config for cluster " + clusterName;
+ LOGGER.info(errMsg);
+ return SortClusterResponse.builder()
+ .msg(errMsg)
+ .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
+ .build();
+ }
+
+ // if the same md5
+ if (sortClusterMd5Map.get(clusterName).equals(md5)) {
+ return SortClusterResponse.builder()
+ .msg("No update")
+ .code(RESPONSE_CODE_NO_UPDATE)
+ .md5(md5)
+ .build();
+ }
+
+ return SortClusterResponse.builder()
+ .msg("Success")
+ .code(RESPONSE_CODE_SUCCESS)
+ .data(sortClusterConfigMap.get(clusterName))
+ .md5(sortClusterMd5Map.get(clusterName))
+ .build();
+ }
+
+ /**
+ * Reload all cluster config.
+ *
+ * <p>
+ * The reload results, including config, md5 and error log, will replace the older ones.
+ * </p>
+ */
+ private void reloadAllClusterConfig() {
+ // get all task and group by cluster
+ List<SortTaskInfo> tasks = streamSinkEntityMapper.selectAllTasks();
+ Map<String, List<SortTaskInfo>> clusterTaskMap =
+ tasks.stream()
+ .filter(dto -> dto.getSortClusterName() != null)
+ .collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
+
+ // get all id params and group by task
+ List<SortIdInfo> idParams = streamSinkEntityMapper.selectAllIdParams();
+ Map<String, List<SortIdInfo>> taskIdParamMap =
+ idParams.stream()
+ .filter(dto -> dto.getSortTaskName() != null)
+ .collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
+
+ // get all sink params and group by data node name
+ List<SortSinkInfo> sinkParams = dataNodeEntityMapper.selectAllSinkParams();
+ Map<String, SortSinkInfo> taskSinkParamMap =
+ sinkParams.stream()
+ .filter(dto -> dto.getName() != null)
+ .collect(Collectors.toMap(SortSinkInfo::getName, param -> param));
+
+ // update config of each cluster
+ Map<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<>();
+ Map<String, String> newMd5Map = new ConcurrentHashMap<>();
+ Map<String, String> newErrorlogMap = new ConcurrentHashMap<>();
+ clusterTaskMap.forEach((clusterName, taskList) -> {
+ try {
+ // get config, then update config map and md5
+ SortClusterConfig clusterConfig =
+ getConfigByClusterName(clusterName, taskList, taskIdParamMap, taskSinkParamMap);
+ String jsonStr = gson.toJson(clusterConfig);
+ String md5 = DigestUtils.md5Hex(jsonStr);
+ newConfigMap.put(clusterName, clusterConfig);
+ newMd5Map.put(clusterName, md5);
+ } catch (Throwable e) {
+ // if get config failed, update the err log.
+ newErrorlogMap.put(clusterName, e.getMessage());
+ LOGGER.error("Failed to update cluster config of {}, error is {}", clusterName, e.getMessage());
+ LOGGER.error(e.getMessage(), e);
+ }
+ });
+ sortClusterErrorLogMap = newErrorlogMap;
+ sortClusterConfigMap = newConfigMap;
+ sortClusterMd5Map = newMd5Map;
+ }
+
+ /**
+ * Get the latest config of specific cluster.
+ *
+ * @param clusterName Cluster name.
+ * @param tasks Task in this cluster.
+ * @param taskIdParamMap All id params.
+ * @param taskSinkParamMap All sink params.
+ * @return The sort cluster config of specific cluster.
+ */
+ private SortClusterConfig getConfigByClusterName(
+ String clusterName,
+ List<SortTaskInfo> tasks,
+ Map<String, List<SortIdInfo>> taskIdParamMap,
+ Map<String, SortSinkInfo> taskSinkParamMap) {
+
+ List<SortTaskConfig> taskConfigs = tasks.stream()
+ .map(task -> {
+ String taskName = task.getSortTaskName();
+ String type = task.getSinkType();
+ List<SortIdInfo> idParams = taskIdParamMap.get(taskName);
+ SortSinkInfo sinkParams = taskSinkParamMap.get(task.getDataNodeName());
+ return this.getTaskConfig(taskName, type, idParams, sinkParams);
+ })
+ .collect(Collectors.toList());
+
+ return SortClusterConfig.builder()
+ .clusterName(clusterName)
+ .sortTasks(taskConfigs)
+ .build();
+ }
+
+ /**
+ * Get task config.
+ * <p>
+ * If there is no any id or sink params, throw exception to upper caller.
+ * </p>
+ *
+ * @param taskName Task name.
+ * @param type Type of sink.
+ * @param idParams Id params.
+ * @param sinkParams Sink params.
+ * @return Task config.
+ */
+ private SortTaskConfig getTaskConfig(
+ String taskName,
+ String type,
+ List<SortIdInfo> idParams,
+ SortSinkInfo sinkParams) {
+
+ Optional.ofNullable(idParams)
+ .orElseThrow(() -> new IllegalStateException(("There is no any id params of task " + taskName)));
+ Optional.ofNullable(sinkParams)
+ .orElseThrow(() -> new IllegalStateException("There is no any sink params of task " + taskName));
+
+ if (!type.equalsIgnoreCase(sinkParams.getType())) {
+ throw new IllegalArgumentException(
+ String.format("for task %s, task type %s and sink type %s are not identical",
+ taskName, type, sinkParams.getType()));
+ }
+
+ return SortTaskConfig.builder()
+ .name(taskName)
+ .type(type)
+ .idParams(this.parseIdParams(idParams))
+ .sinkParams(this.parseSinkParams(sinkParams))
+ .build();
+
+ }
+
+ /**
+ * Parse id params from json.
+ * @param rowIdParams IdParams in json format.
+ * @return List of IdParams.
+ */
+ private List<Map<String, String>> parseIdParams(List<SortIdInfo> rowIdParams) {
+ return rowIdParams.stream()
+ .map(row -> {
+ Map<String, String> param = gson.fromJson(row.getExtParams(), HashMap.class);
+ // put group and stream info
+ param.put(KEY_GROUP_ID, row.getInlongGroupId());
+ param.put(KEY_STREAM_ID, row.getInlongStreamId());
+ return param;
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse sink params from json.
+ * @param rowSinkParams Sink params in json format.
+ * @return Sink params.
+ */
+ private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
+ return gson.fromJson(rowSinkParams.getExtParams(), HashMap.class);
+ }
+
+ /**
+ * Set reload timer at the beginning of repository.
+ */
+ private void setReloadTimer() {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS);
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 4b3406b57..a7f30db83 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -22,23 +22,16 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sdk.CacheZone;
import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.manager.service.core.SortClusterConfigService;
+import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortSourceService;
-import org.apache.inlong.manager.service.core.SortTaskIdParamService;
import org.apache.inlong.manager.service.core.SortService;
-import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
/**
@@ -54,88 +47,15 @@ public class SortServiceImpl implements SortService {
private static final int RESPONSE_CODE_FAIL = -1;
private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
- @Autowired private SortClusterConfigService sortClusterConfigService;
-
- @Autowired private SortTaskIdParamService sortTaskIdParamService;
-
- @Autowired private SortTaskSinkParamService sortTaskSinkParamService;
@Autowired private SortSourceService sortSourceService;
+ @Autowired private SortClusterService sortClusterService;
+
@Override
public SortClusterResponse getClusterConfig(String clusterName, String md5) {
- LOGGER.info("start getClusterConfig");
-
- // check if cluster name is valid or not.
- if (StringUtils.isBlank(clusterName)) {
- String errMsg = "Blank cluster name, return nothing";
- LOGGER.info(errMsg);
- return SortClusterResponse.builder().msg(errMsg).build();
- }
-
- // check if there is any task.
- List<SortClusterConfigEntity> tasks = sortClusterConfigService.selectTasksByClusterName(clusterName);
-
- if (tasks == null || tasks.isEmpty()) {
- String errMsg = "There is not any task for cluster" + clusterName;
- LOGGER.info(errMsg);
- return SortClusterResponse.builder()
- .code(RESPONSE_CODE_REQ_PARAMS_ERROR)
- .msg(errMsg)
- .build();
- }
-
- // add task configs
- List<SortTaskConfig> taskConfigs = new ArrayList<>();
- try {
- tasks.forEach(clusterConfig -> taskConfigs.add(this.getTaskConfig(clusterConfig)));
- } catch (IllegalArgumentException ex) {
- String errMsg = "Got illegal sink type from db, " + ex.getMessage();
- LOGGER.info(errMsg);
- return SortClusterResponse.builder()
- .code(RESPONSE_CODE_FAIL)
- .msg(errMsg)
- .build();
- }
-
- SortClusterConfig clusterConfig = SortClusterConfig.builder()
- .clusterName(clusterName)
- .sortTasks(taskConfigs)
- .build();
-
- JSONObject job = new JSONObject(clusterConfig);
- String localMd5 = DigestUtils.md5Hex(job.toString());
-
- // no update
- if (localMd5.equals(md5)) {
- return SortClusterResponse.builder()
- .code(RESPONSE_CODE_NO_UPDATE)
- .msg("No update")
- .md5(localMd5)
- .build();
- }
-
- return SortClusterResponse.builder()
- .code(RESPONSE_CODE_SUCCESS)
- .data(clusterConfig)
- .msg("success")
- .md5(localMd5)
- .build();
- }
- private SortTaskConfig getTaskConfig(SortClusterConfigEntity clusterConfig) {
- String sinkType = clusterConfig.getSinkType().toUpperCase();
- List<Map<String, String>> idParams =
- sortTaskIdParamService.selectByTaskName(clusterConfig.getTaskName());
- Map<String, String> sinkParams =
- sortTaskSinkParamService
- .selectByTaskNameAndType(clusterConfig.getTaskName(), clusterConfig.getSinkType());
- return SortTaskConfig.builder()
- .name(clusterConfig.getTaskName())
- .type(sinkType)
- .idParams(idParams)
- .sinkParams(sinkParams)
- .build();
+ return sortClusterService.getClusterConfig(clusterName, md5);
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
deleted file mode 100644
index cab343d49..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskIdParamServiceImpl.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
-import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
-import org.apache.inlong.manager.service.core.SortTaskIdParamService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sort id params service implementation.
- */
-@Service
-public class SortTaskIdParamServiceImpl implements SortTaskIdParamService {
-
- private static final String KEY_GROUP_ID = "inlongGroupId";
-
- private static final String KEY_STREAM_ID = "inlongStreamId";
-
- @Autowired private SortTaskIdParamEntityMapper sortTaskIdParamEntityMapper;
-
- @Override
- public List<Map<String, String>> selectByTaskName(String taskName) {
- List<SortTaskIdParamEntity> taskIdParamEntityList =
- sortTaskIdParamEntityMapper.selectByTaskName(taskName);
- Map<String, Map<String, String>> idParams = new HashMap<>();
- taskIdParamEntityList.forEach(entity -> {
- Map<String, String> idParam =
- idParams.computeIfAbsent(entity.getKey(), key -> new HashMap<>());
- idParam.put(entity.getParamKey(), entity.getParamValue());
- idParam.putIfAbsent(KEY_GROUP_ID, entity.getGroupId());
- idParam.putIfAbsent(KEY_STREAM_ID, entity.getStreamId());
- });
- return new ArrayList<>(idParams.values());
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
deleted file mode 100644
index 0cf5af26f..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortTaskSinkParamServiceImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
-import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
-import org.apache.inlong.manager.service.core.SortTaskSinkParamService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sort sink params service implementation.
- */
-@Service
-public class SortTaskSinkParamServiceImpl implements SortTaskSinkParamService {
- private static final Logger LOGGER = LoggerFactory.getLogger(SortTaskSinkParamServiceImpl.class);
-
- @Autowired
- private SortTaskSinkParamEntityMapper sortTaskSinkParamEntityMapper;
-
- @Override
- public Map<String, String> selectByTaskNameAndType(String taskName, String sinkType) {
- LOGGER.info("task name is {}, sink type is {}", taskName, sinkType);
- List<SortTaskSinkParamEntity> taskSinkParamEntityList =
- sortTaskSinkParamEntityMapper.selectByTaskNameAndType(taskName);
- Map<String, String> sinkParams = new HashMap<>();
- taskSinkParamEntityList.forEach(entity -> sinkParams.put(entity.getParamKey(), entity.getParamValue()));
- return sinkParams;
- }
-
-}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
index 305289377..aacadaa12 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/SortControllerTest.java
@@ -18,8 +18,6 @@
package org.apache.inlong.manager.web.controller.openapi;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.SortTaskIdParamEntity;
import org.apache.inlong.manager.dao.entity.SortTaskSinkParamEntity;
@@ -27,15 +25,11 @@ import org.apache.inlong.manager.dao.mapper.SortClusterConfgiEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortTaskIdParamEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortTaskSinkParamEntityMapper;
import org.apache.inlong.manager.web.WebBaseTest;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.RequestBuilder;
-import org.springframework.transaction.annotation.Transactional;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
@@ -70,29 +64,7 @@ class SortControllerTest extends WebBaseTest {
sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask1", "kafka"));
sortClusterConfgiEntityMapper.insert(this.prepareClusterConfigEntity("testTask2", "pulsar"));
}
-
- /**
- * Test if the interface works.
- *
- * @throws Exception Exceptions to request generating.
- */
- @Test
- @Transactional
- void testGetSortClusterConfig() throws Exception {
- MvcResult mvcResult = mockMvc.perform(
- get("/openapi/sort/getClusterConfig")
- .param("clusterName", "testCluster")
- .param("md5", "testMd5")
- )
- .andExpect(status().isOk())
- .andReturn();
-
- SortClusterResponse sortClusterResponse = JsonUtils.parseObject(mvcResult.getResponse().getContentAsString(),
- SortClusterResponse.class);
- Assertions.assertNotNull(sortClusterResponse);
- Assertions.assertEquals("testCluster", sortClusterResponse.getData().getClusterName());
- }
-
+
// @Test
// @Transactional
public void testErrorSinkType() throws Exception {