You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/05 05:58:50 UTC

[incubator-inlong] branch master updated: [INLONG-2903][Manager] Add inlong stream info into GroupResourceProcessForm (#2909)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cd7122  [INLONG-2903][Manager] Add inlong stream info into GroupResourceProcessForm (#2909)
0cd7122 is described below

commit 0cd71226f94a749bbc370118e62299d8f1acdb33
Author: pacino <ge...@gmail.com>
AuthorDate: Sat Mar 5 13:58:47 2022 +0800

    [INLONG-2903][Manager] Add inlong stream info into GroupResourceProcessForm (#2909)
---
 .../manager/common/pojo/stream/InlongStreamInfo.java    |  3 +++
 .../pojo/workflow/form/GroupResourceProcessForm.java    |  7 ++++++-
 .../pojo/workflow/form/UpdateGroupProcessForm.java      |  4 ++++
 .../manager/dao/mapper/InlongStreamEntityMapper.java    |  3 ++-
 .../service/core/impl/InlongGroupProcessOperation.java  | 17 ++++++++++++++++-
 .../group/listener/StartCreateGroupProcessListener.java | 14 +++++++++++++-
 6 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
index 0f13c7a..7f2724b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamInfo.java
@@ -67,6 +67,9 @@ public class InlongStreamInfo extends InlongStreamBaseInfo {
     @ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
     private Integer havePredefinedFields;
 
+    @ApiModelProperty(value = "order_preserving 0: none, 1: yes")
+    private Integer syncSend;
+
     @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
     private Integer dailyRecords;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
index 3a5a4a2..ef7dd15 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/GroupResourceProcessForm.java
@@ -18,11 +18,14 @@
 package org.apache.inlong.manager.common.pojo.workflow.form;
 
 import com.google.common.collect.Maps;
-import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.inlong.manager.common.exceptions.FormValidateException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+
+import java.util.List;
+import java.util.Map;
 
 /**
  * Form of create inlong group resource
@@ -37,6 +40,8 @@ public class GroupResourceProcessForm extends BaseProcessForm {
 
     private String streamId;
 
+    private List<InlongStreamInfo> inlongStreamInfoList;
+
     public InlongGroupInfo getGroupInfo() {
         return groupInfo;
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
index 72191a1..f424eff 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/UpdateGroupProcessForm.java
@@ -25,8 +25,10 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.inlong.manager.common.exceptions.FormValidateException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.util.Preconditions;
 
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
@@ -42,6 +44,8 @@ public class UpdateGroupProcessForm extends BaseProcessForm {
     @ApiModelProperty(value = "OperateType to define the update operation", required = true)
     private OperateType operateType;
 
+    private List<InlongStreamInfo> inlongStreamInfoList;
+
     @Override
     public void validate() throws FormValidateException {
         Preconditions.checkNotNull(groupInfo, "inlong group info is empty");
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
index 28d55bd..b76d7e2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamEntityMapper.java
@@ -17,13 +17,14 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import java.util.List;
 import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.springframework.stereotype.Repository;
 
+import java.util.List;
+
 @Repository
 public interface InlongStreamEntityMapper {
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index a8fb3ca..d16149f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -17,18 +17,21 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import java.util.List;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewGroupProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm.OperateType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
@@ -38,6 +41,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+
 /**
  * Operation related to inlong group process
  */
@@ -52,6 +57,9 @@ public class InlongGroupProcessOperation {
     @Autowired
     private InlongStreamService streamService;
 
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
+
     /**
      * Allocate resource application groups for access services and initiate an approval process
      *
@@ -136,6 +144,13 @@ public class InlongGroupProcessOperation {
     private UpdateGroupProcessForm genUpdateGroupProcessForm(InlongGroupInfo groupInfo,
             OperateType operateType) {
         UpdateGroupProcessForm updateForm = new UpdateGroupProcessForm();
+        if (OperateType.RESTART == operateType) {
+            List<InlongStreamEntity> inlongStreamEntityList =
+                    streamMapper.selectByGroupId(groupInfo.getInlongGroupId());
+            List<InlongStreamInfo> inlongStreamInfoList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
+                    InlongStreamInfo::new);
+            updateForm.setInlongStreamInfoList(inlongStreamInfoList);
+        }
         updateForm.setGroupInfo(groupInfo);
         updateForm.setOperateType(operateType);
         return updateForm;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
index 24b7ab3..b693a65 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/StartCreateGroupProcessListener.java
@@ -19,8 +19,12 @@ package org.apache.inlong.manager.service.workflow.group.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewGroupProcessForm;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
@@ -31,6 +35,8 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * After the new inlong group is approved, initiate a listener for other processes
  */
@@ -43,6 +49,9 @@ public class StartCreateGroupProcessListener implements ProcessEventListener {
     @Autowired
     private WorkflowService workflowService;
 
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
+
     @Override
     public ProcessEvent event() {
         return ProcessEvent.COMPLETE;
@@ -59,8 +68,11 @@ public class StartCreateGroupProcessListener implements ProcessEventListener {
         GroupResourceProcessForm processForm = new GroupResourceProcessForm();
         processForm.setGroupInfo(groupService.get(groupId));
         String username = context.getApplicant();
+        List<InlongStreamEntity> inlongStreamEntityList = streamMapper.selectByGroupId(groupId);
+        List<InlongStreamInfo> inlongStreamInfoList = CommonBeanUtils.copyListProperties(inlongStreamEntityList,
+                InlongStreamInfo::new);
+        processForm.setInlongStreamInfoList(inlongStreamInfoList);
         workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
-
         return ListenerResult.success();
     }