You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/17 10:39:59 UTC

[incubator-inlong] branch master updated: [INLONG-3188][Manager] Fix status was not right when CreateSortConfigListener throw an exception (#3189)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 496f6ca  [INLONG-3188][Manager] Fix status was not right when CreateSortConfigListener throw an exception (#3189)
496f6ca is described below

commit 496f6ca90be24bee8f8d24352158c7656df99430
Author: pacino <ge...@gmail.com>
AuthorDate: Thu Mar 17 18:39:52 2022 +0800

    [INLONG-3188][Manager] Fix status was not right when CreateSortConfigListener throw an exception (#3189)
---
 .../thirdparty/sort/CreateSortConfigListener.java  | 35 ++++++++++++----------
 1 file changed, 20 insertions(+), 15 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 8852a37..28fa3bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -63,11 +63,11 @@ public class CreateSortConfigListener implements SortOperateListener {
 
     @Override
     public TaskEvent event() {
-        return TaskEvent.CREATE;
+        return TaskEvent.COMPLETE;
     }
 
     @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         LOGGER.info("Create sort config for context={}", context);
         ProcessForm form = context.getProcessForm();
         if (form instanceof UpdateGroupProcessForm) {
@@ -90,21 +90,26 @@ public class CreateSortConfigListener implements SortOperateListener {
             return ListenerResult.success();
         }
 
-        Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponseList.stream().map(sink -> {
-                    DataFlowInfo flowInfo = commonOperateService.createDataFlow(groupInfo, sink);
-                    return Pair.of(sink.getInlongStreamId(), flowInfo);
-                }
-        ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+        try {
+            Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponseList.stream().map(sink -> {
+                        DataFlowInfo flowInfo = commonOperateService.createDataFlow(groupInfo, sink);
+                        return Pair.of(sink.getInlongStreamId(), flowInfo);
+                    }
+            ).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
-        String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
-        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
-        extInfo.setInlongGroupId(groupId);
-        extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
-        extInfo.setKeyValue(dataFlows);
-        if (groupInfo.getExtList() == null) {
-            groupInfo.setExtList(Lists.newArrayList());
+            String dataFlows = OBJECT_MAPPER.writeValueAsString(dataFlowInfoMap);
+            InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
+            extInfo.setInlongGroupId(groupId);
+            extInfo.setKeyName(InlongGroupSettings.DATA_FLOW);
+            extInfo.setKeyValue(dataFlows);
+            if (groupInfo.getExtList() == null) {
+                groupInfo.setExtList(Lists.newArrayList());
+            }
+            upsertDataFlow(groupInfo, extInfo);
+        } catch (Exception e) {
+            LOGGER.error("create sort config failed for sink list={} ", sinkResponseList, e);
+            throw new WorkflowListenerException("create sort config failed: " + e.getMessage());
         }
-        upsertDataFlow(groupInfo, extInfo);
 
         return ListenerResult.success();
     }