You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 11:54:21 UTC
[inlong] branch master updated: [INLONG-5769][Manager] Fix the FILE type unsupported for SourceOperateListener (#5771)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 02732c069 [INLONG-5769][Manager] Fix the FILE type unsupported for SourceOperateListener (#5771)
02732c069 is described below
commit 02732c0698b25715bf2d68bbfe11a344f98e8100
Author: healchow <he...@gmail.com>
AuthorDate: Fri Sep 2 19:54:15 2022 +0800
[INLONG-5769][Manager] Fix the FILE type unsupported for SourceOperateListener (#5771)
---
.../source/AbstractSourceOperateListener.java | 44 +++++-----------------
1 file changed, 9 insertions(+), 35 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index fdcb430bb..7f1097e18 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -21,18 +21,12 @@ import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceRequest;
-import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
-import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -75,15 +69,16 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
List<StreamSource> unOperatedSources = Lists.newArrayList();
streamResponses.forEach(stream ->
operateStreamSources(groupId, stream.getInlongStreamId(), context.getOperator(), unOperatedSources));
+
if (CollectionUtils.isNotEmpty(unOperatedSources)) {
- GroupOperateType groupOperateType = getOperateType(context.getProcessForm());
- StringBuilder builder = new StringBuilder("Unsupported operate ").append(groupOperateType).append(" for (");
+ GroupOperateType operateType = getOperateType(context.getProcessForm());
+ StringBuilder builder = new StringBuilder("Unsupported operate ").append(operateType).append(" for (");
unOperatedSources.forEach(source -> builder.append(" ").append(source.getSourceName()).append(" "));
String errMsg = builder.append(")").toString();
throw new WorkflowListenerException(errMsg);
- } else {
- return ListenerResult.success();
}
+
+ return ListenerResult.success();
}
/**
@@ -93,10 +88,8 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
List<StreamSource> unOperatedSources) {
List<StreamSource> sources = streamSourceService.listSource(groupId, streamId);
sources.forEach(source -> {
- boolean checkIfOp = checkIfOp(source, unOperatedSources);
- if (checkIfOp) {
- SourceRequest sourceRequest = createSourceRequest(source);
- operateStreamSource(sourceRequest, operator);
+ if (checkIfOp(source, unOperatedSources)) {
+ operateStreamSource(source.genSourceRequest(), operator);
}
});
}
@@ -130,25 +123,6 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
return false;
}
- /**
- * Creat source request by source type.
- *
- * @param streamSource source information
- * @return source request
- */
- public SourceRequest createSourceRequest(StreamSource streamSource) {
- String sourceType = streamSource.getSourceType();
- switch (sourceType) {
- case SourceType.MYSQL_BINLOG:
- return CommonBeanUtils.copyProperties((MySQLBinlogSource) streamSource, MySQLBinlogSourceRequest::new);
- case SourceType.KAFKA:
- return CommonBeanUtils.copyProperties((KafkaSource) streamSource, KafkaSourceRequest::new);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported type=%s for SourceOperateListener", sourceType));
- }
- }
-
/**
* Operate stream sources ,such as delete, stop, restart.
*/
@@ -159,7 +133,7 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
return ((GroupResourceProcessForm) processForm).getGroupOperateType();
} else {
log.error("illegal process form {} to get inlong group info", processForm.getFormName());
- throw new RuntimeException("Unsupported ProcessForm " + processForm.getFormName());
+ throw new RuntimeException("Unsupported process form " + processForm.getFormName());
}
}
@@ -169,7 +143,7 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
return groupResourceProcessForm.getGroupInfo();
} else {
log.error("illegal process form {} to get inlong group info", processForm.getFormName());
- throw new RuntimeException("Unsupported ProcessForm " + processForm.getFormName());
+ throw new RuntimeException("Unsupported process form " + processForm.getFormName());
}
}