You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 14:01:06 UTC

[incubator-inlong] branch master updated: [INLONG-3042][Manager] Supplements of binlog all migration (#3043)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aa1d581  [INLONG-3042][Manager] Supplements of binlog all migration (#3043)
aa1d581 is described below

commit aa1d5819926d02d57004c03914698e5b6ed2b2d5
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 21:59:07 2022 +0800

    [INLONG-3042][Manager] Supplements of binlog all migration (#3043)
---
 .../inlong/manager/common/enums/GroupState.java    |  3 ++-
 .../thirdparty/sort/CreateSortConfigListener.java  | 10 ++++++++-
 .../thirdparty/sort/util/SinkInfoUtils.java        |  4 ++--
 .../thirdparty/sort/util/SourceInfoUtils.java      | 16 +++++++++++----
 .../group/DeleteGroupWorkflowDefinition.java       |  4 ++--
 .../group/RestartGroupWorkflowDefinition.java      | 24 +++++++++++-----------
 6 files changed, 39 insertions(+), 22 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java
index 21108e0..8a074b7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java
@@ -113,7 +113,8 @@ public enum GroupState {
     }
 
     public static boolean isAllowedLogicDel(GroupState state) {
-        return state == GroupState.GROUP_DRAFT || state == GroupState.GROUP_WAIT_SUBMIT;
+        return state == GroupState.GROUP_DRAFT || state == GroupState.GROUP_WAIT_SUBMIT
+                || state == GroupState.GROUP_DELETE || state == GroupState.GROUP_FINISH;
     }
 
     public Integer getCode() {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index a752757..82ab24b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -17,13 +17,13 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
@@ -38,6 +38,7 @@ import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm.OperateType;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -90,6 +91,13 @@ public class CreateSortConfigListener implements SortOperateListener {
     public ListenerResult listen(WorkflowContext context) throws Exception {
         LOGGER.info("Create sort config for context={}", context);
         ProcessForm form = context.getProcessForm();
+        if (form instanceof UpdateGroupProcessForm) {
+            UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) form;
+            OperateType operateType = updateGroupProcessForm.getOperateType();
+            if (operateType == OperateType.SUSPEND || operateType == OperateType.DELETE) {
+                return ListenerResult.success();
+            }
+        }
         InlongGroupInfo groupInfo = this.getGroupInfo(form);
         String groupId = groupInfo.getInlongGroupId();
         if (StringUtils.isEmpty(groupId)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 1edb0a8..525ef63 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -120,7 +120,7 @@ public class SinkInfoUtils {
             KafkaSinkResponse sinkResponse) {
         List<FieldInfo> fieldInfoList = Lists.newArrayList();
         if (isAllMigration) {
-            fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
+            fieldInfoList = SourceInfoUtils.getAllMigrationBuiltInField();
         } else {
             fieldInfoList = getSinkFields(sinkResponse.getFieldList(), null);
         }
@@ -192,7 +192,7 @@ public class SinkInfoUtils {
         // Get the sink field, if there is no partition field in the source field, add the partition field to the end
         List<FieldInfo> fieldInfoList = Lists.newArrayList();
         if (isAllMigration) {
-            fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
+            fieldInfoList = SourceInfoUtils.getAllMigrationBuiltInField();
         } else {
             fieldInfoList = getSinkFields(hiveInfo.getFieldList(), hiveInfo.getPrimaryPartition());
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index f850f94..dbd4d58 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -83,9 +83,17 @@ public class SourceInfoUtils {
     /**
      * Get all migration built-in field for binlog source.
      */
-    public static BuiltInFieldInfo getAllMigrationBuiltInField() {
-        return new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
-                BuiltInField.MYSQL_METADATA_DATA);
+    public static List<FieldInfo> getAllMigrationBuiltInField() {
+        List<FieldInfo> list = Lists.newArrayList();
+        list.add(new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
+                BuiltInField.MYSQL_METADATA_DATA));
+        for (Map.Entry<String, BuiltInField> entry : BUILT_IN_FIELD_MAP.entrySet()) {
+            if (entry.getKey().equals("data_time")) {
+                continue;
+            }
+            list.add(new BuiltInFieldInfo(entry.getKey(), StringFormatInfo.INSTANCE, entry.getValue()));
+        }
+        return list;
     }
 
     /**
@@ -98,7 +106,7 @@ public class SourceInfoUtils {
         List<FieldInfo> fieldInfos = Lists.newArrayList();
         boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
         if (isAllMigration) {
-            fieldInfos.add(SourceInfoUtils.getAllMigrationBuiltInField());
+            fieldInfos = SourceInfoUtils.getAllMigrationBuiltInField();
         } else {
             if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
                 fieldInfos = getSourceFields(sinkResponse.getFieldList());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index ba29865..2c55f30 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -63,7 +63,7 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //stop datasource
+        //delete datasource
         ServiceTask deleteDataSourceTask = new ServiceTask();
         deleteDataSourceTask.setName("deleteSource");
         deleteDataSourceTask.setDisplayName("Group-DeleteSource");
@@ -71,7 +71,7 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
         deleteDataSourceTask.addListenerProvider(serviceTaskListenerFactory);
         process.addTask(deleteDataSourceTask);
 
-        //stop sort
+        //delete sort
         ServiceTask deleteSortTask = new ServiceTask();
         deleteSortTask.setName("deleteSort");
         deleteSortTask.setDisplayName("Group-DeleteSort");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index 50ac17a..9a9a9f2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -65,15 +65,7 @@ public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //stop datasource
-        ServiceTask restartDataSourceTask = new ServiceTask();
-        restartDataSourceTask.setName("restartSource");
-        restartDataSourceTask.setDisplayName("Group-RestartSource");
-        restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
-        restartDataSourceTask.addListenerProvider(serviceTaskListenerFactory);
-        process.addTask(restartDataSourceTask);
-
-        //stop sort
+        //restart sort
         ServiceTask restartSortTask = new ServiceTask();
         restartSortTask.setName("restartSort");
         restartSortTask.setDisplayName("Group-RestartSort");
@@ -81,13 +73,21 @@ public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
         restartSortTask.addListenerProvider(serviceTaskListenerFactory);
         process.addTask(restartSortTask);
 
+        //restart datasource
+        ServiceTask restartDataSourceTask = new ServiceTask();
+        restartDataSourceTask.setName("restartSource");
+        restartDataSourceTask.setDisplayName("Group-RestartSource");
+        restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+        restartDataSourceTask.addListenerProvider(serviceTaskListenerFactory);
+        process.addTask(restartDataSourceTask);
+
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
-        startEvent.addNext(restartDataSourceTask);
-        restartDataSourceTask.addNext(restartSortTask);
-        restartSortTask.addNext(endEvent);
+        startEvent.addNext(restartSortTask);
+        restartSortTask.addNext(restartDataSourceTask);
+        restartDataSourceTask.addNext(endEvent);
 
         return process;
     }