You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/28 02:16:30 UTC

[incubator-inlong] branch master updated: [INLONG-3981][Manager] Fix state check of lightweight group (#3982)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6991707e5 [INLONG-3981][Manager] Fix state check of lightweight group (#3982)
6991707e5 is described below

commit 6991707e5a0a8f2c78dc8b9f726709598726f213
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Apr 28 10:16:24 2022 +0800

    [INLONG-3981][Manager] Fix state check of lightweight group (#3982)
---
 .../core/impl/InlongGroupProcessOperation.java      |  4 ++--
 .../manager/service/sort/util/ExtractNodeUtils.java |  5 ++++-
 .../manager/service/sort/util/LoadNodeUtils.java    | 21 +++++++++++++++++++--
 .../service/sort/util/TransformNodeUtils.java       |  2 +-
 .../listener/light/LightGroupCompleteListener.java  | 11 +++++++++++
 .../listener/light/LightGroupFailedListener.java    |  6 ++++++
 .../light/LightGroupUpdateCompleteListener.java     | 14 ++++++++++----
 .../listener/light/LightGroupUpdateListener.java    |  1 -
 8 files changed, 53 insertions(+), 11 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index 9d5aa3848..712a73ea8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -160,7 +160,7 @@ public class InlongGroupProcessOperation {
             case LIGHT:
                 LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.RESTART);
                 executorService.execute(
-                        () -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, lightForm));
+                        () -> workflowService.start(ProcessName.RESTART_LIGHT_GROUP_PROCESS, operator, lightForm));
                 break;
             default:
                 throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
@@ -237,7 +237,7 @@ public class InlongGroupProcessOperation {
             case LIGHT:
                 LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo,
                         GroupOperateType.DELETE);
-                workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, lightForm);
+                workflowService.start(ProcessName.DELETE_LIGHT_GROUP_PROCESS, operator, lightForm);
                 break;
             default:
                 throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 572777847..817381f34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -85,7 +85,10 @@ public class ExtractNodeUtils {
         String userName = binlogSourceResponse.getUser();
         String password = binlogSourceResponse.getPassword();
         Integer port = binlogSourceResponse.getPort();
-        Integer serverId = binlogSourceResponse.getServerId();
+        Integer serverId = null;
+        if (binlogSourceResponse.getServerId() != null && binlogSourceResponse.getServerId() > 0) {
+            serverId = binlogSourceResponse.getServerId();
+        }
         String tables = binlogSourceResponse.getTableWhiteList();
         List<String> tableNames = Splitter.on(",").splitToList(tables);
         List<InlongStreamFieldInfo> streamFieldInfos = binlogSourceResponse.getFieldList();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index e56feafb9..928e701ce 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -72,8 +72,7 @@ public class LoadNodeUtils {
                 .map(sinkFieldResponse -> new FieldInfo(sinkFieldResponse.getFieldName(), name,
                         FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
                                 sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
-        List<FieldRelationShip> fieldRelationShips = fieldInfos.stream()
-                .map(fieldInfo -> new FieldRelationShip(fieldInfo, fieldInfo)).collect(Collectors.toList());
+        List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
         Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         Integer sinkParallelism = null;
@@ -114,4 +113,22 @@ public class LoadNodeUtils {
                 properties,
                 primaryKey);
     }
+
+    public static List<FieldRelationShip> parseSinkFields(List<SinkFieldResponse> sinkFieldResponses, String sinkName) {
+        if (CollectionUtils.isEmpty(sinkFieldResponses)) {
+            return Lists.newArrayList();
+        }
+        return sinkFieldResponses.stream().map(sinkFieldResponse -> {
+            String fieldName = sinkFieldResponse.getFieldName();
+            String fieldType = sinkFieldResponse.getFieldType();
+            String fieldFormat = sinkFieldResponse.getFieldFormat();
+            FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
+                    FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
+            String sourceFieldName = sinkFieldResponse.getSourceFieldName();
+            String sourceFieldType = sinkFieldResponse.getSourceFieldType();
+            FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
+                    FieldInfoUtils.convertFieldFormat(sourceFieldType));
+            return new FieldRelationShip(sourceField, sinkField);
+        }).collect(Collectors.toList());
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index e913229cb..8952414b7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -93,7 +93,7 @@ public class TransformNodeUtils {
                 throw new UnsupportedOperationException(
                         String.format("Unsupported deduplication strategy=%s for inlong", deDuplicationStrategy));
         }
-        TransformNode transformNode = createTransformNode(transformResponse);
+        TransformNode transformNode = createNormalTransformNode(transformResponse);
         return new DistinctNode(transformNode.getId(),
                 transformNode.getName(),
                 transformNode.getFields(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
index 93038e480..dacb14dfc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
@@ -19,8 +19,12 @@ package org.apache.inlong.manager.service.workflow.group.listener.light;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
 import org.apache.inlong.manager.service.core.InlongGroupService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -34,6 +38,10 @@ public class LightGroupCompleteListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
+    @Autowired
+    private InlongStreamService streamService;
+    @Autowired
+    private StreamSourceService sourceService;
 
     @Override
     public ProcessEvent event() {
@@ -48,6 +56,9 @@ public class LightGroupCompleteListener implements ProcessEventListener {
         // Update inlong group status and other info
         groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
         groupService.update(form.getGroupInfo().genRequest(), applicant);
+        // Update status of other related configs
+        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
+        sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
index dd0e547b0..9a585259e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
@@ -19,8 +19,10 @@ package org.apache.inlong.manager.service.workflow.group.listener.light;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
 import org.apache.inlong.manager.service.core.InlongGroupService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -34,6 +36,8 @@ public class LightGroupFailedListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
+    @Autowired
+    private InlongStreamService streamService;
 
     @Override
     public ProcessEvent event() {
@@ -48,6 +52,8 @@ public class LightGroupFailedListener implements ProcessEventListener {
         // Update inlong group status
         groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), applicant);
         groupService.update(form.getGroupInfo().genRequest(), applicant);
+        // Update inlong stream status
+        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), applicant);
         return ListenerResult.fail();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
index baeb2277c..bd47438b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
@@ -21,10 +21,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -40,9 +42,10 @@ public class LightGroupUpdateCompleteListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
-
     @Autowired
     private InlongStreamService streamService;
+    @Autowired
+    private StreamSourceService sourceService;
 
     @Override
     public ProcessEvent event() {
@@ -57,13 +60,16 @@ public class LightGroupUpdateCompleteListener implements ProcessEventListener {
         GroupOperateType groupOperateType = form.getGroupOperateType();
         switch (groupOperateType) {
             case SUSPEND:
-                groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), applicant);
+                groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), applicant);
+                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_FROZEN.getCode(), applicant);
                 break;
             case RESTART:
-                groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), applicant);
+                groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), applicant);
+                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
                 break;
             case DELETE:
-                groupService.updateStatus(groupId, GroupStatus.DELETING.getCode(), applicant);
+                groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), applicant);
+                sourceService.logicDeleteAll(groupId, null, applicant);
                 break;
             default:
                 break;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
index da5788390..ee77a0558 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
@@ -40,7 +40,6 @@ public class LightGroupUpdateListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
-
     @Autowired
     private InlongStreamService streamService;