You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/09/07 07:34:12 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 45b4f6da8 [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)
45b4f6da8 is described below
commit 45b4f6da843ffa244bdd67b4c05c8c5818708b3c
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 7 13:57:54 2022 +0800
[INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)
Co-authored-by: healchow <he...@gmail.com>
---
.../service/listener/source/AbstractSourceOperateListener.java | 5 +++++
.../inlong/manager/service/source/StreamSourceServiceImpl.java | 3 ---
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index 7f1097e18..c07c86905 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -99,6 +99,11 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
*/
@SneakyThrows
public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) {
+ // if a source has sub-sources, it is considered a template source.
+ // template sources do not need to be operated, its sub-sources will be processed in this method later.
+ if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
+ return false;
+ }
for (int retry = 0; retry < 60; retry++) {
int status = streamSource.getStatus();
SourceStatus sourceStatus = SourceStatus.forCode(status);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 04a64e7ac..5f785fe2c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -237,7 +237,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
@@ -272,7 +271,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.info("begin to restart source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
SourceRequest sourceRequest = new SourceRequest();
@@ -290,7 +288,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.info("begin to stop source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
SourceRequest sourceRequest = new SourceRequest();