You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/09 06:14:03 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5836][Manager] Fix the source status related behavior problems (#5837)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 872b7de60 [INLONG-5836][Manager] Fix the source status related behavior problems (#5837)
872b7de60 is described below
commit 872b7de6049f73ee8ec795a73c1e585a1f0ef104
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Sep 9 12:45:51 2022 +0800
[INLONG-5836][Manager] Fix the source status related behavior problems (#5837)
---
.../inlong/manager/common/enums/SourceStatus.java | 54 +++++++++++-----------
.../dao/mapper/StreamSourceEntityMapper.java | 2 +-
.../resources/mappers/StreamSourceEntityMapper.xml | 2 +-
.../inlong/manager/pojo/source/SourceRequest.java | 5 ++
.../service/core/impl/AgentServiceImpl.java | 6 +--
.../source/AbstractSourceOperateListener.java | 9 ++--
.../listener/source/SourceRestartListener.java | 6 +++
.../listener/source/SourceStopListener.java | 6 +++
.../service/source/AbstractSourceOperator.java | 24 ++++++----
.../service/source/StreamSourceServiceImpl.java | 7 ++-
10 files changed, 71 insertions(+), 50 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
index 1561af0ec..5d5182d41 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
@@ -104,33 +104,33 @@ public enum SourceStatus {
SOURCE_STATE_AUTOMATON.put(SOURCE_FROZEN, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE));
// [xxx] bo be issued
- HashSet<SourceStatus> tobeAdd = Sets.newHashSet(BEEN_ISSUED_ADD);
- tobeAdd.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAdd);
- HashSet<SourceStatus> tobeDelete = Sets.newHashSet(BEEN_ISSUED_DELETE);
- tobeDelete.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDelete));
- HashSet<SourceStatus> tobeRetry = Sets.newHashSet(BEEN_ISSUED_RETRY);
- tobeRetry.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetry));
- HashSet<SourceStatus> tobeBacktrack = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
- tobeBacktrack.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrack));
- HashSet<SourceStatus> tobeFrozen = Sets.newHashSet(BEEN_ISSUED_FROZEN);
- tobeFrozen.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozen));
- HashSet<SourceStatus> tobeActive = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
- tobeActive.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActive));
- HashSet<SourceStatus> tobeCheck = Sets.newHashSet(BEEN_ISSUED_CHECK);
- tobeCheck.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheck));
- HashSet<SourceStatus> tobeRedoMetric = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
- tobeRedoMetric.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetric));
- HashSet<SourceStatus> tobeMakeup = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
- tobeMakeup.addAll(TOBE_ISSUED_SET);
- SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeup));
+ HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE);
+ tobeAddNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext);
+ HashSet<SourceStatus> tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE);
+ tobeDeleteNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDeleteNext));
+ HashSet<SourceStatus> tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY);
+ tobeRetryNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetryNext));
+ HashSet<SourceStatus> tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
+ tobeBacktrackNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrackNext));
+ HashSet<SourceStatus> tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_FROZEN);
+ tobeFrozenNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozenNext));
+ HashSet<SourceStatus> tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
+ tobeActiveNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActiveNext));
+ HashSet<SourceStatus> tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK);
+ tobeCheckNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheckNext));
+ HashSet<SourceStatus> tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
+ tobeRedoMetricNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetricNext));
+ HashSet<SourceStatus> tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
+ tobeMakeupNext.addAll(TOBE_ISSUED_SET);
+ SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeupNext));
// [xxx] been issued
SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 3b0144d3d..8cc63e4e3 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -78,7 +78,7 @@ public interface StreamSourceEntityMapper {
*/
List<StreamSourceEntity> selectByAgentIpOrCluster(@Param("statusList") List<Integer> statusList,
@Param("sourceTypeList") List<String> sourceTypeList, @Param("agentIp") String agentIp,
- @Param("clusterName") String clusterName, @Param("limit") int limit);
+ @Param("clusterName") String clusterName);
/**
* Query the sources with status 20x by the given agent IP and agent UUID.
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c1b955f57..07bb4e8d4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -222,7 +222,6 @@
</foreach>
</if>
and (agent_ip = #{agentIp, jdbcType=VARCHAR} or inlong_cluster_name = #{clusterName, jdbcType=VARCHAR})
- limit #{limit, jdbcType=INTEGER}
</where>
</select>
<select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -372,6 +371,7 @@
<if test="streamId != null">
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</if>
+ and source_type != 'AUTO_PUSH'
</where>
</update>
<update id="updateIpAndUuid">
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 42735478e..46b7c338d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.source;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -90,4 +91,8 @@ public class SourceRequest {
@ApiModelProperty("Other properties if needed")
private Map<String, Object> properties = new LinkedHashMap<>();
+ @JsonIgnore
+ @ApiModelProperty("Sub source information of existing agents")
+ private List<SubSourceDTO> subSourceList;
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 91b3f7238..5e56c385d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -215,15 +215,15 @@ public class AgentServiceImpl implements AgentService {
Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName),
"both agent ip and cluster name are blank when fetching file task");
List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
- Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10);
+ Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName);
List<DataConfig> fileTasks = Lists.newArrayList();
for (StreamSourceEntity sourceEntity : sourceEntities) {
FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(sourceEntity.getExtParams());
final String destIp = sourceEntity.getAgentIp();
final String destClusterName = sourceEntity.getInlongClusterName();
- // Cluster name is blank
- if (StringUtils.isNotBlank(destIp) && StringUtils.isBlank(destClusterName)) {
+ // Use ip directly if it is not empty
+ if (StringUtils.isNotBlank(destIp)) {
if (destIp.equals(agentIp)) {
int op = getOp(sourceEntity.getStatus());
int nextStatus = getNextStatus(sourceEntity.getStatus());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index c07c86905..01eca1ba7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -99,15 +99,12 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
*/
@SneakyThrows
public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) {
- // if a source has sub-sources, it is considered a template source.
- // template sources do not need to be operated, its sub-sources will be processed in this method later.
- if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
- return false;
- }
for (int retry = 0; retry < 60; retry++) {
int status = streamSource.getStatus();
SourceStatus sourceStatus = SourceStatus.forCode(status);
- if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN) {
+ // template sources are filtered and processed in corresponding subclass listeners
+ if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN
+ || CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
return true;
} else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) {
return false;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
index ec7170f48..54229273d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.source;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -47,6 +48,11 @@ public class SourceRestartListener extends AbstractSourceOperateListener {
@Override
public void operateStreamSource(SourceRequest sourceRequest, String operator) {
+ // if a source has sub-sources, it is considered a template source.
+ // template sources do not need to be restarted, its sub-sources will be processed in this method later.
+ if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+ return;
+ }
streamSourceService.restart(sourceRequest.getId(), operator);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
index 0ccdbc5b3..7e2aa08f8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.source;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -47,6 +48,11 @@ public class SourceStopListener extends AbstractSourceOperateListener {
@Override
public void operateStreamSource(SourceRequest sourceRequest, String operator) {
+ // if a source has sub-sources, it is considered a template source.
+ // template sources do not need to be stopped, its sub-sources will be processed in this method later.
+ if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+ return;
+ }
streamSourceService.stop(sourceRequest.getId(), operator);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 096a57deb..0d4a3f0b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -78,13 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
@Transactional(rollbackFor = Throwable.class)
public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
- if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
- if (request.getSourceType().equals(SourceType.AUTO_PUSH)) {
- // auto push task needs not be issued to agent
- entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
- } else {
- entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
- }
+ if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
+ // auto push task needs not be issued to agent
+ entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
+ } else if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+ entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
} else {
entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
}
@@ -121,6 +119,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
public void updateOpt(SourceRequest request, Integer groupStatus, String operator) {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
+
+ if (SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
+ LOGGER.warn("auto push source {} can not be updated", entity.getSourceName());
+ return;
+ }
if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
throw new BusinessException(String.format("source=%s is not allowed to update, "
+ "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
@@ -132,7 +135,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
- // Source type cannot be changed
+ // source type cannot be changed
if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
throw new BusinessException(String.format("source type=%s cannot change to %s",
entity.getSourceType(), request.getSourceType()));
@@ -150,11 +153,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
}
}
- // Setting updated parameters of stream source entity.
+ // setting updated parameters of stream source entity.
setTargetEntity(request, entity);
entity.setModifier(operator);
- // Re-issue task if necessary
+ // re-issue task if necessary
entity.setPreviousStatus(entity.getStatus());
if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
@@ -171,6 +174,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
break;
}
}
+
int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
LOGGER.warn(errMsg);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 5f785fe2c..c805ed1f4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
@@ -237,12 +238,14 @@ public class StreamSourceServiceImpl implements StreamSourceService {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
+ boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));
SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
- // if source is frozen|failed|new , delete directly
+ // if source is frozen|failed|new, or if it is a template source or auto push source, delete directly
if (curStatus == SourceStatus.SOURCE_FROZEN || curStatus == SourceStatus.SOURCE_FAILED
- || curStatus == SourceStatus.SOURCE_NEW) {
+ || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource
+ || SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
nextStatus = SourceStatus.SOURCE_DISABLE;
}
if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {