You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/17 10:39:59 UTC
[incubator-inlong] branch master updated: [INLONG-3188][Manager] Fix status was not right when CreateSortConfigListener throw an exception (#3189)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 496f6ca [INLONG-3188][Manager] Fix status was not right when CreateSortConfigListener throw an exception (#3189)
496f6ca is described below
commit 496f6ca90be24bee8f8d24352158c7656df99430
Author: pacino <ge...@gmail.com>
AuthorDate: Thu Mar 17 18:39:52 2022 +0800
[INLONG-3188][Manager] Fix status was not right when CreateSortConfigListener throw an exception (#3189)
---
.../thirdparty/sort/CreateSortConfigListener.java | 35 ++++++++++++----------
1 file changed, 20 insertions(+), 15 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 8852a37..28fa3bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -63,11 +63,11 @@ public class CreateSortConfigListener implements SortOperateListener {
@Override
public TaskEvent event() {
- return TaskEvent.CREATE;
+ return TaskEvent.COMPLETE;
}
@Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
LOGGER.info("Create sort config for context={}", context);
ProcessForm form = context.getProcessForm();
if (form instanceof UpdateGroupProcessForm) {
@@ -90,21 +90,26 @@ public class CreateSortConfigListener implements SortOperateListener {
return ListenerResult.success();
}
- Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponseList.stream().map(sink -> {
- DataFlowInfo flowInfo = commonOperateService.createDataFlow(groupInfo, sink);
- return Pair.of(sink.getInlongStreamId(), flowInfo);
- }
- ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ try {
+ Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponseList.stream().map(sink -> {
+ DataFlowInfo flowInfo = commonOperateService.createDataFlow(groupInfo, sink);
+ return Pair.of(sink.getInlongStreamId(), flowInfo);
+ }
+ ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
- String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
- extInfo.setInlongGroupId(groupId);
- extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
- extInfo.setKeyValue(dataFlows);
- if (groupInfo.getExtList() == null) {
- groupInfo.setExtList(Lists.newArrayList());
+ String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
+ InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+ extInfo.setInlongGroupId(groupId);
+ extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+ extInfo.setKeyValue(dataFlows);
+ if (groupInfo.getExtList() == null) {
+ groupInfo.setExtList(Lists.newArrayList());
+ }
+ upsertDataFlow(groupInfo, extInfo);
+ } catch (Exception e) {
+ LOGGER.error("create sort config failed for sink list={} ", sinkResponseList, e);
+ throw new WorkflowListenerException("create sort config failed: " + e.getMessage());
}
- upsertDataFlow(groupInfo, extInfo);
return ListenerResult.success();
}