You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/31 06:11:16 UTC
[inlong] branch master updated: [INLONG-6288][Manager] Refactor getSortClusterConfig by using MyBatis Cursor (#6294)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 48d8fce48 [INLONG-6288][Manager] Refactor getSortClusterConfig by using MyBatis Cursor (#6294)
48d8fce48 is described below
commit 48d8fce48fc4530f775febb8e93a796e044898c5
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Oct 31 14:11:11 2022 +0800
[INLONG-6288][Manager] Refactor getSortClusterConfig by using MyBatis Cursor (#6294)
---
.../manager/dao/mapper/DataNodeEntityMapper.java | 6 +
.../manager/dao/mapper/StreamSinkEntityMapper.java | 9 +-
.../dao/mapper/StreamSinkFieldEntityMapper.java | 7 +
.../resources/mappers/DataNodeEntityMapper.xml | 7 +-
.../resources/mappers/StreamSinkEntityMapper.xml | 13 +-
.../mappers/StreamSinkFieldEntityMapper.xml | 10 +-
.../pojo/sort/standalone/SortFieldInfo.java | 27 ++++
.../manager/service/core/SortConfigLoader.java | 28 ++++
.../service/core/impl/SortClusterServiceImpl.java | 176 +++++++++------------
.../service/core/impl/SortConfigLoaderImpl.java | 46 ++++++
.../service/core/impl/SortSourceServiceImpl.java | 7 +
.../service/node/AbstractDataNodeOperator.java | 10 ++
.../manager/service/node/DataNodeOperator.java | 9 ++
.../manager/service/sink/AbstractSinkOperator.java | 26 ++-
.../manager/service/sink/StreamSinkOperator.java | 9 ++
.../manager/service/sort/SortServiceImplTest.java | 72 +++++----
16 files changed, 312 insertions(+), 150 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
index ac17f283e..e246f4931 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataNodeEntityMapper.java
@@ -17,7 +17,10 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.pojo.sort.standalone.SortSinkInfo;
@@ -36,6 +39,9 @@ public interface DataNodeEntityMapper {
List<DataNodeEntity> selectByCondition(DataNodePageRequest request);
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+ Cursor<DataNodeEntity> selectAllDataNodes();
+
List<SortSinkInfo> selectAllSinkParams();
int updateById(DataNodeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 20fbd9e55..63dbf098d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -120,21 +120,24 @@ public interface StreamSinkEntityMapper {
*/
List<SinkInfo> selectAllConfig(@Param("groupId") String groupId, @Param("idList") List<String> streamIdList);
- List<StreamSinkEntity> selectAllStreamSinks();
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+ Cursor<StreamSinkEntity> selectAllStreamSinks();
/**
* Select all tasks for sort-standalone
*
* @return All tasks
*/
- List<SortTaskInfo> selectAllTasks();
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+ Cursor<SortTaskInfo> selectAllTasks();
/**
* Select all id params for sort-standalone
*
* @return All id params
*/
- List<SortIdInfo> selectAllIdParams();
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+ Cursor<SortIdInfo> selectAllIdParams();
/**
* Select all streams for sort sdk.
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
index b487deda7..fbba1fce0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkFieldEntityMapper.java
@@ -17,8 +17,12 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.springframework.stereotype.Repository;
import java.util.List;
@@ -34,6 +38,9 @@ public interface StreamSinkFieldEntityMapper {
List<StreamSinkFieldEntity> selectFields(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)
+ Cursor<SortFieldInfo> selectAllFields();
+
/**
* According to the sink id, query the sink field.
*
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
index 90ceccaf3..3cfcbf398 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataNodeEntityMapper.xml
@@ -100,7 +100,12 @@
from data_node
where is_deleted = 0
</select>
-
+ <select id="selectAllDataNodes" resultType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from data_node
+ where is_deleted = 0
+ </select>
<update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.DataNodeEntity">
update data_node
set name = #{name, jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index aac142d59..f6422643c 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -350,12 +350,6 @@
</if>
</where>
</select>
- <select id="selectAllStreamSinks" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
- select
- <include refid="Base_Column_List"/>
- from stream_sink
- where is_deleted = 0
- </select>
<select id="selectAllTasks" resultType="org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo">
select inlong_cluster_name as sortClusterName,
sort_task_name,
@@ -382,7 +376,12 @@
from stream_sink
where is_deleted = 0
</select>
-
+ <select id="selectAllStreamSinks" resultType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_sink
+ where is_deleted = 0
+ </select>
<update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
update stream_sink
<set>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index 8c85171ff..8273c3334 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -119,7 +119,15 @@
and field.is_deleted = 0
and sink.is_deleted = 0
</select>
-
+ <select id="selectAllFields" resultType="org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo">
+ select
+ inlong_group_id,
+ inlong_stream_id,
+ field_name
+ from stream_sink_field
+ where is_deleted = 0
+ order by id asc
+ </select>
<update id="logicDeleteAll">
update stream_sink_field
set is_deleted = id
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
new file mode 100644
index 000000000..f686ed065
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortFieldInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.standalone;
+
+import lombok.Data;
+
+@Data
+public class SortFieldInfo {
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String fieldName;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
index 7bfa2a8ec..6b0a02fc5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortConfigLoader.java
@@ -17,12 +17,16 @@
package org.apache.inlong.manager.service.core;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import java.util.List;
@@ -67,4 +71,28 @@ public interface SortConfigLoader {
* @return List of stream info
*/
List<SortSourceStreamInfo> loadAllStreams();
+
+ /**
+ * Load all inlong stream sink entity by cursor
+ * @return List of stream sink entity
+ */
+ List<StreamSinkEntity> loadAllStreamSinkEntity();
+
+ /**
+ * Load all task info
+ * @return List of tasks
+ */
+ List<SortTaskInfo> loadAllTask();
+
+ /**
+ * Load all data node entity
+ * @return List of data node
+ */
+ List<DataNodeEntity> loadAllDataNodeEntity();
+
+ /**
+ * Load all fields info
+ * @return List of fields info
+ */
+ List<SortFieldInfo> loadAllFields();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
index b9584cf96..e55aa3ea8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortClusterServiceImpl.java
@@ -23,12 +23,17 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.manager.pojo.sort.standalone.SortIdInfo;
-import org.apache.inlong.manager.pojo.sort.standalone.SortSinkInfo;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
-import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
-import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.service.core.SortClusterService;
+import org.apache.inlong.manager.service.core.SortConfigLoader;
+import org.apache.inlong.manager.service.node.DataNodeOperator;
+import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
+import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
+import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,6 +42,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -67,6 +73,7 @@ public class SortClusterServiceImpl implements SortClusterService {
private static final String KEY_GROUP_ID = "inlongGroupId";
private static final String KEY_STREAM_ID = "inlongStreamId";
+ private Map<String, List<String>> fieldMap;
// key : sort cluster name, value : md5
private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<>();
@@ -78,9 +85,11 @@ public class SortClusterServiceImpl implements SortClusterService {
private long reloadInterval;
@Autowired
- private StreamSinkEntityMapper streamSinkEntityMapper;
+ private SortConfigLoader sortConfigLoader;
@Autowired
- private DataNodeEntityMapper dataNodeEntityMapper;
+ private SinkOperatorFactory sinkOperatorFactory;
+ @Autowired
+ private DataNodeOperatorFactory dataNodeOperatorFactory;
@PostConstruct
public void initialize() {
@@ -98,7 +107,7 @@ public class SortClusterServiceImpl implements SortClusterService {
public void reload() {
LOGGER.debug("start to reload sort config");
try {
- reloadAllClusterConfig();
+ reloadAllClusterConfigV2();
} catch (Throwable t) {
LOGGER.error(t.getMessage(), t);
}
@@ -152,78 +161,83 @@ public class SortClusterServiceImpl implements SortClusterService {
.build();
}
- /**
- * Reload all cluster config.
- * The results including config, md5 and error log, will replace the older ones.
- */
- private void reloadAllClusterConfig() {
- // get all task and group by cluster
- List<SortTaskInfo> tasks = streamSinkEntityMapper.selectAllTasks();
+ private void reloadAllClusterConfigV2() {
+ // load all fields info
+ List<SortFieldInfo> fieldInfos = sortConfigLoader.loadAllFields();
+ fieldMap = new HashMap<>();
+ fieldInfos.forEach(info -> {
+ List<String> fields = fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList<>());
+ fields.add(info.getFieldName());
+ });
+
+ List<StreamSinkEntity> sinkEntities = sortConfigLoader.loadAllStreamSinkEntity();
+ // get all task under a given cluster, has been reduced into cluster and task.
+ List<SortTaskInfo> tasks = sortConfigLoader.loadAllTask();
Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream()
.filter(dto -> dto.getSortClusterName() != null)
.collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
- // get all id params and group by task
- List<SortIdInfo> idParams = streamSinkEntityMapper.selectAllIdParams();
- Map<String, List<SortIdInfo>> taskIdParamMap = idParams.stream()
- .filter(dto -> dto.getSortTaskName() != null)
- .collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
-
- // get all sink params and group by data node name
- List<SortSinkInfo> sinkParams = dataNodeEntityMapper.selectAllSinkParams();
- Map<String, SortSinkInfo> taskSinkParamMap = sinkParams.stream()
- .filter(dto -> dto.getName() != null)
- .collect(Collectors.toMap(SortSinkInfo::getName, param -> param));
+ // get all stream sinks
+ Map<String, List<StreamSinkEntity>> task2AllStreams = sinkEntities.stream()
+ .filter(entity -> StringUtils.isNotBlank(entity.getInlongClusterName()))
+ .collect(Collectors.groupingBy(StreamSinkEntity::getSinkName));
+
+ // get all data nodes and group by node name
+ List<DataNodeEntity> dataNodeEntities = sortConfigLoader.loadAllDataNodeEntity();
+ Map<String, DataNodeInfo> task2DataNodeMap = dataNodeEntities.stream()
+ .filter(entity -> StringUtils.isNotBlank(entity.getName()))
+ .map(entity -> {
+ DataNodeOperator operator = dataNodeOperatorFactory.getInstance(entity.getType());
+ return operator.getFromEntity(entity);
+ })
+ .collect(Collectors.toMap(DataNodeInfo::getName, info -> info));
- // update config of each cluster
+ // re-org all SortClusterConfigs
Map<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<>();
Map<String, String> newMd5Map = new ConcurrentHashMap<>();
Map<String, String> newErrorLogMap = new ConcurrentHashMap<>();
+
clusterTaskMap.forEach((clusterName, taskList) -> {
try {
- // get config, then update config map and md5
- SortClusterConfig clusterConfig = getConfigByClusterName(clusterName, taskList, taskIdParamMap,
- taskSinkParamMap);
- String jsonStr = GSON.toJson(clusterConfig);
+ SortClusterConfig config = this.getConfigByClusterNameV2(clusterName,
+ taskList, task2AllStreams, task2DataNodeMap);
+ String jsonStr = GSON.toJson(config);
String md5 = DigestUtils.md5Hex(jsonStr);
- newConfigMap.put(clusterName, clusterConfig);
+ newConfigMap.put(clusterName, config);
newMd5Map.put(clusterName, md5);
} catch (Throwable e) {
// if get config failed, update the err log.
newErrorLogMap.put(clusterName, e.getMessage());
- LOGGER.error("Failed to update cluster config of {}, error is {}", clusterName, e.getMessage());
- LOGGER.error(e.getMessage(), e);
+ LOGGER.error("Failed to update cluster config={}, error={}", clusterName, e.getMessage());
}
});
+
sortClusterErrorLogMap = newErrorLogMap;
sortClusterConfigMap = newConfigMap;
sortClusterMd5Map = newMd5Map;
}
- /**
- * Get the latest config of specific cluster.
- *
- * @param clusterName Cluster name.
- * @param tasks Task in this cluster.
- * @param taskIdParamMap All id params.
- * @param taskSinkParamMap All sink params.
- * @return The sort cluster config of specific cluster.
- */
- private SortClusterConfig getConfigByClusterName(
+ private SortClusterConfig getConfigByClusterNameV2(
String clusterName,
List<SortTaskInfo> tasks,
- Map<String, List<SortIdInfo>> taskIdParamMap,
- Map<String, SortSinkInfo> taskSinkParamMap) {
+ Map<String, List<StreamSinkEntity>> task2AllStreams,
+ Map<String, DataNodeInfo> task2DataNodeMap) {
List<SortTaskConfig> taskConfigs = tasks.stream()
.map(task -> {
String taskName = task.getSortTaskName();
String type = task.getSinkType();
- List<SortIdInfo> idParams = taskIdParamMap.get(taskName);
- SortSinkInfo sinkParams = taskSinkParamMap.get(task.getDataNodeName());
- return this.getTaskConfig(taskName, type, idParams, sinkParams);
+ String dataNodeName = task.getDataNodeName();
+ DataNodeInfo nodeInfo = task2DataNodeMap.get(dataNodeName);
+ List<StreamSinkEntity> streams = task2AllStreams.get(taskName);
+
+ return SortTaskConfig.builder()
+ .name(taskName)
+ .type(type)
+ .idParams(this.parseIdParamsV2(streams))
+ .sinkParams(this.parseSinkParamsV2(nodeInfo))
+ .build();
})
- .filter(Objects::nonNull)
.collect(Collectors.toList());
return SortClusterConfig.builder()
@@ -232,64 +246,20 @@ public class SortClusterServiceImpl implements SortClusterService {
.build();
}
- /**
- * Get task config.
- * <p/>
- * If there is not any id or sink params, throw exception to upper caller.
- *
- * @param taskName Task name.
- * @param type Type of sink.
- * @param idParams Id params.
- * @param sinkParams Sink params.
- * @return Task config.
- */
- private SortTaskConfig getTaskConfig(String taskName, String type, List<SortIdInfo> idParams,
- SortSinkInfo sinkParams) {
- // return null if id params or sink params are empty.
- if (idParams == null || sinkParams == null) {
- return null;
- }
-
- if (!type.equalsIgnoreCase(sinkParams.getType())) {
- throw new IllegalArgumentException(
- String.format("task type %s and sink type %s are not identical for task name %s",
- type, sinkParams.getType(), taskName));
- }
-
- return SortTaskConfig.builder()
- .name(taskName)
- .type(type)
- .idParams(this.parseIdParams(idParams))
- .sinkParams(this.parseSinkParams(sinkParams))
- .build();
- }
-
- /**
- * Parse id params from json.
- *
- * @param rowIdParams IdParams in json format.
- * @return List of IdParams.
- */
- private List<Map<String, String>> parseIdParams(List<SortIdInfo> rowIdParams) {
- return rowIdParams.stream()
- .map(row -> {
- Map<String, String> param = GSON.fromJson(row.getExtParams(), HashMap.class);
- // put group and stream info
- param.put(KEY_GROUP_ID, row.getInlongGroupId());
- param.put(KEY_STREAM_ID, row.getInlongStreamId());
- return param;
+ private List<Map<String, String>> parseIdParamsV2(List<StreamSinkEntity> streams) {
+ return streams.stream()
+ .map(streamSink -> {
+ StreamSinkOperator operator = sinkOperatorFactory.getInstance(streamSink.getSinkType());
+ List<String> fields = fieldMap.get(streamSink.getInlongGroupId());
+ return operator.parse2IdParams(streamSink, fields);
})
+ .filter(Objects::nonNull)
.collect(Collectors.toList());
}
- /**
- * Parse sink params from json.
- *
- * @param rowSinkParams Sink params in json format.
- * @return Sink params.
- */
- private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
- return GSON.fromJson(rowSinkParams.getExtParams(), HashMap.class);
+ private Map<String, String> parseSinkParamsV2(DataNodeInfo nodeInfo) {
+ DataNodeOperator operator = dataNodeOperatorFactory.getInstance(nodeInfo.getType());
+ return operator.parse2SinkParams(nodeInfo);
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
index f442f9a46..100c53d4f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortConfigLoaderImpl.java
@@ -18,18 +18,24 @@
package org.apache.inlong.manager.service.core.impl;
import org.apache.ibatis.cursor.Cursor;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
+import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -45,6 +51,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader {
@Autowired
private StreamSinkEntityMapper streamSinkEntityMapper;
@Autowired
+ private StreamSinkFieldEntityMapper streamSinkFieldEntityMapper;
+ @Autowired
private InlongGroupEntityMapper inlongGroupEntityMapper;
@Autowired
private InlongGroupExtEntityMapper inlongGroupExtEntityMapper;
@@ -52,6 +60,8 @@ public class SortConfigLoaderImpl implements SortConfigLoader {
private InlongStreamExtEntityMapper inlongStreamExtEntityMapper;
@Autowired
private InlongStreamEntityMapper inlongStreamEntityMapper;
+ @Autowired
+ private DataNodeEntityMapper dataNodeEntityMapper;
@Transactional
@Override
@@ -106,4 +116,40 @@ public class SortConfigLoaderImpl implements SortConfigLoader {
cursor.forEach(allStreams::add);
return allStreams;
}
+
+ @Transactional
+ @Override
+ public List<StreamSinkEntity> loadAllStreamSinkEntity() {
+ Cursor<StreamSinkEntity> cursor = streamSinkEntityMapper.selectAllStreamSinks();
+ List<StreamSinkEntity> allStreamSinks = new ArrayList<>();
+ cursor.forEach(allStreamSinks::add);
+ return allStreamSinks;
+ }
+
+ @Transactional
+ @Override
+ public List<SortTaskInfo> loadAllTask() {
+ Cursor<SortTaskInfo> cursor = streamSinkEntityMapper.selectAllTasks();
+ List<SortTaskInfo> allTasks = new ArrayList<>();
+ cursor.forEach(allTasks::add);
+ return allTasks;
+ }
+
+ @Transactional
+ @Override
+ public List<DataNodeEntity> loadAllDataNodeEntity() {
+ Cursor<DataNodeEntity> cursor = dataNodeEntityMapper.selectAllDataNodes();
+ List<DataNodeEntity> allDataNodes = new ArrayList<>();
+ cursor.forEach(allDataNodes::add);
+ return allDataNodes;
+ }
+
+ @Transactional
+ @Override
+ public List<SortFieldInfo> loadAllFields() {
+ Cursor<SortFieldInfo> cursor = streamSinkFieldEntityMapper.selectAllFields();
+ List<SortFieldInfo> allFields = new ArrayList<>();
+ cursor.forEach(allFields::add);
+ return allFields;
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index cadf713c5..9059a1b3e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -268,6 +268,13 @@ public class SortSourceServiceImpl implements SortSourceService {
});
sortSourceConfigMap = newConfigMap;
sortSourceMd5Map = newMd5Map;
+ mqClusters = null;
+ groupInfos = null;
+ allStreams = null;
+ backupClusterTag = null;
+ backupGroupMqResource = null;
+ backupStreamMqResource = null;
+ groupMap = null;
}
private Map<String, CacheZone> parseCacheZones(
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index 0ccea9162..51eeb91f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -21,8 +21,10 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +32,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Default operation of data node.
*/
@@ -75,4 +80,9 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
}
+
+ @Override
+ public Map<String, String> parse2SinkParams(DataNodeInfo info) {
+ return JsonUtils.parseObject(info.getExtParams(), HashMap.class);
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index ce579994e..aeae290b7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -21,6 +21,8 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import java.util.Map;
+
/**
* Interface of the data node operator.
*/
@@ -62,4 +64,11 @@ public interface DataNodeOperator {
* @param operator name of operator
*/
void updateOpt(DataNodeRequest request, String operator);
+
+ /**
+ * Parse data node info to sort-standalone sink params
+ * @param info DataNodeInfo
+ * @return Sink params
+ */
+ Map<String, String> parse2SinkParams(DataNodeInfo info);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index db6cd23aa..35f0b6b5f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.service.sink;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.pagehelper.Page;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -26,6 +25,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
@@ -41,7 +41,9 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -52,8 +54,9 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSinkOperator.class);
- @Autowired
- protected ObjectMapper objectMapper;
+ protected static final String KEY_GROUP_ID = "inlongGroupId";
+ protected static final String KEY_STREAM_ID = "inlongStreamId";
+
@Autowired
protected StreamSinkEntityMapper sinkMapper;
@Autowired
@@ -213,6 +216,23 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
sinkFieldMapper.logicDeleteAll(entity.getId());
}
+ @Override
+ public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields) {
+ Map<String, String> param;
+ try {
+ param = JsonUtils.parseObject(streamSink.getExtParams(), HashMap.class);
+ } catch (Exception e) {
+ LOGGER.error("cannot parse properties of groupId={}, streamId={}, sinkName={}, the row properties is={}, "
+ + "exception={}", streamSink.getInlongGroupId(), streamSink.getInlongStreamId(),
+ streamSink.getSinkName(), streamSink.getExtParams(), e.getMessage());
+ return null;
+ }
+ // put group and stream info
+ param.put(KEY_GROUP_ID, streamSink.getInlongGroupId());
+ param.put(KEY_STREAM_ID, streamSink.getInlongStreamId());
+ return param;
+ }
+
/**
* Check the validity of sink fields.
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 71ed90842..a2d933595 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
import javax.validation.constraints.NotNull;
import java.util.List;
+import java.util.Map;
/**
* Interface of the sink operator
@@ -98,4 +99,12 @@ public interface StreamSinkOperator {
* @param operator name of the operator
*/
void deleteOpt(StreamSinkEntity entity, String operator);
+
+ /**
+ * Parse stream sink to id params
+ *
+ * @param streamSink
+ * @return
+ */
+ Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields);
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index f8eb9eff8..338ee3276 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -20,11 +20,11 @@ package org.apache.inlong.manager.service.sort;
import org.apache.inlong.common.constant.ClusterSwitch;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
@@ -34,11 +34,16 @@ import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.SortService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.node.DataNodeService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -52,6 +57,8 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
/**
* Sort service test for {@link SortService}
@@ -85,6 +92,10 @@ public class SortServiceImplTest extends ServiceBaseTest {
private InlongGroupService groupService;
@Autowired
private InlongStreamService streamService;
+ @Autowired
+ private DataNodeService dataNodeService;
+ @Autowired
+ private StreamSinkService streamSinkService;
@Test
@Order(1)
@@ -201,18 +212,18 @@ public class SortServiceImplTest extends ServiceBaseTest {
}
private void prepareDataNode(String taskName) {
- DataNodeEntity entity = new DataNodeEntity();
- entity.setName(taskName);
- entity.setType(TEST_SINK_TYPE);
- entity.setExtParams("{\"paramKey1\":\"paramValue1\"}");
- entity.setCreator(TEST_CREATOR);
- entity.setInCharges(TEST_CREATOR);
- Date now = new Date();
- entity.setCreateTime(now);
- entity.setModifyTime(now);
- entity.setIsDeleted(InlongConstants.UN_DELETED);
- entity.setVersion(InlongConstants.INITIAL_VERSION);
- dataNodeEntityMapper.insert(entity);
+ HiveDataNodeRequest request = new HiveDataNodeRequest();
+ request.setUrl("test_hive_url");
+ request.setName(taskName);
+ request.setExtParams("{\"paramKey1\":\"paramValue1\",\"hdfsUgi\":\"test_hdfsUgi\"}");
+ request.setHdfsPath("testPath");
+ request.setHiveConfDir("testDir");
+ request.setWarehouse("testWareHouse");
+ request.setHdfsUgi("testUgi");
+ request.setInCharges(TEST_CREATOR);
+ request.setUsername("test_hive_user");
+ request.setToken("test_hive_token");
+ dataNodeService.save(request, TEST_CREATOR);
}
private void prepareGroupId(String groupId) {
@@ -265,7 +276,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
private void prepareCluster(String clusterName) {
InlongClusterEntity entity = new InlongClusterEntity();
entity.setName(clusterName);
- entity.setType(TEST_SINK_TYPE);
+ entity.setType(DataNodeType.HIVE);
entity.setExtParams("{}");
entity.setCreator(TEST_CREATOR);
entity.setInCharges(TEST_CREATOR);
@@ -295,22 +306,19 @@ public class SortServiceImplTest extends ServiceBaseTest {
}
private void prepareTask(String taskName, String groupId, String clusterName) {
- StreamSinkEntity entity = new StreamSinkEntity();
- entity.setInlongGroupId(groupId);
- entity.setInlongStreamId("1");
- entity.setSinkType(TEST_SINK_TYPE);
- entity.setSinkName(taskName);
- entity.setInlongClusterName(clusterName);
- entity.setDataNodeName(taskName);
- entity.setSortTaskName(taskName);
- entity.setCreator(TEST_CREATOR);
- Date now = new Date();
- entity.setCreateTime(now);
- entity.setModifyTime(now);
- entity.setIsDeleted(InlongConstants.UN_DELETED);
- entity.setVersion(InlongConstants.INITIAL_VERSION);
- entity.setExtParams("{\"delimiter\":\"|\",\"dataType\":\"text\"}");
- streamSinkEntityMapper.insert(entity);
+ SinkRequest request = new HiveSinkRequest();
+ request.setDataNodeName(taskName);
+ request.setSinkType(SinkType.HIVE);
+ request.setInlongClusterName(clusterName);
+ request.setSinkName(taskName);
+ request.setSortTaskName(taskName);
+ request.setInlongGroupId(groupId);
+ request.setInlongStreamId("1");
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("delimiter", "|");
+ properties.put("dataType", "text");
+ request.setProperties(properties);
+ streamSinkService.save(request, TEST_CREATOR);
}
-}
\ No newline at end of file
+}