You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 14:01:06 UTC
[incubator-inlong] branch master updated: [INLONG-3042][Manager] Supplements of binlog all migration (#3043)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new aa1d581 [INLONG-3042][Manager] Supplements of binlog all migration (#3043)
aa1d581 is described below
commit aa1d5819926d02d57004c03914698e5b6ed2b2d5
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 21:59:07 2022 +0800
[INLONG-3042][Manager] Supplements of binlog all migration (#3043)
---
.../inlong/manager/common/enums/GroupState.java | 3 ++-
.../thirdparty/sort/CreateSortConfigListener.java | 10 ++++++++-
.../thirdparty/sort/util/SinkInfoUtils.java | 4 ++--
.../thirdparty/sort/util/SourceInfoUtils.java | 16 +++++++++++----
.../group/DeleteGroupWorkflowDefinition.java | 4 ++--
.../group/RestartGroupWorkflowDefinition.java | 24 +++++++++++-----------
6 files changed, 39 insertions(+), 22 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java
index 21108e0..8a074b7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupState.java
@@ -113,7 +113,8 @@ public enum GroupState {
}
public static boolean isAllowedLogicDel(GroupState state) {
- return state == GroupState.GROUP_DRAFT || state == GroupState.GROUP_WAIT_SUBMIT;
+ return state == GroupState.GROUP_DRAFT || state == GroupState.GROUP_WAIT_SUBMIT
+ || state == GroupState.GROUP_DELETE || state == GroupState.GROUP_FINISH;
}
public Integer getCode() {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index a752757..82ab24b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -17,13 +17,13 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
@@ -38,6 +38,7 @@ import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm.OperateType;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -90,6 +91,13 @@ public class CreateSortConfigListener implements SortOperateListener {
public ListenerResult listen(WorkflowContext context) throws Exception {
LOGGER.info("Create sort config for context={}", context);
ProcessForm form = context.getProcessForm();
+ if (form instanceof UpdateGroupProcessForm) {
+ UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) form;
+ OperateType operateType = updateGroupProcessForm.getOperateType();
+ if (operateType == OperateType.SUSPEND || operateType == OperateType.DELETE) {
+ return ListenerResult.success();
+ }
+ }
InlongGroupInfo groupInfo = this.getGroupInfo(form);
String groupId = groupInfo.getInlongGroupId();
if (StringUtils.isEmpty(groupId)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
index 1edb0a8..525ef63 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SinkInfoUtils.java
@@ -120,7 +120,7 @@ public class SinkInfoUtils {
KafkaSinkResponse sinkResponse) {
List<FieldInfo> fieldInfoList = Lists.newArrayList();
if (isAllMigration) {
- fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
+ fieldInfoList = SourceInfoUtils.getAllMigrationBuiltInField();
} else {
fieldInfoList = getSinkFields(sinkResponse.getFieldList(), null);
}
@@ -192,7 +192,7 @@ public class SinkInfoUtils {
// Get the sink field, if there is no partition field in the source field, add the partition field to the end
List<FieldInfo> fieldInfoList = Lists.newArrayList();
if (isAllMigration) {
- fieldInfoList.add(SourceInfoUtils.getAllMigrationBuiltInField());
+ fieldInfoList = SourceInfoUtils.getAllMigrationBuiltInField();
} else {
fieldInfoList = getSinkFields(hiveInfo.getFieldList(), hiveInfo.getPrimaryPartition());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index f850f94..dbd4d58 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -83,9 +83,17 @@ public class SourceInfoUtils {
/**
* Get all migration built-in field for binlog source.
*/
- public static BuiltInFieldInfo getAllMigrationBuiltInField() {
- return new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
- BuiltInField.MYSQL_METADATA_DATA);
+ public static List<FieldInfo> getAllMigrationBuiltInField() {
+ List<FieldInfo> list = Lists.newArrayList();
+ list.add(new BuiltInFieldInfo("data", StringFormatInfo.INSTANCE,
+ BuiltInField.MYSQL_METADATA_DATA));
+ for (Map.Entry<String, BuiltInField> entry : BUILT_IN_FIELD_MAP.entrySet()) {
+ if (entry.getKey().equals("data_time")) {
+ continue;
+ }
+ list.add(new BuiltInFieldInfo(entry.getKey(), StringFormatInfo.INSTANCE, entry.getValue()));
+ }
+ return list;
}
/**
@@ -98,7 +106,7 @@ public class SourceInfoUtils {
List<FieldInfo> fieldInfos = Lists.newArrayList();
boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
if (isAllMigration) {
- fieldInfos.add(SourceInfoUtils.getAllMigrationBuiltInField());
+ fieldInfos = SourceInfoUtils.getAllMigrationBuiltInField();
} else {
if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
fieldInfos = getSourceFields(sinkResponse.getFieldList());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index ba29865..2c55f30 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -63,7 +63,7 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //stop datasource
+ //delete datasource
ServiceTask deleteDataSourceTask = new ServiceTask();
deleteDataSourceTask.setName("deleteSource");
deleteDataSourceTask.setDisplayName("Group-DeleteSource");
@@ -71,7 +71,7 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
deleteDataSourceTask.addListenerProvider(serviceTaskListenerFactory);
process.addTask(deleteDataSourceTask);
- //stop sort
+ //delete sort
ServiceTask deleteSortTask = new ServiceTask();
deleteSortTask.setName("deleteSort");
deleteSortTask.setDisplayName("Group-DeleteSort");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index 50ac17a..9a9a9f2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -65,15 +65,7 @@ public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //stop datasource
- ServiceTask restartDataSourceTask = new ServiceTask();
- restartDataSourceTask.setName("restartSource");
- restartDataSourceTask.setDisplayName("Group-RestartSource");
- restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
- restartDataSourceTask.addListenerProvider(serviceTaskListenerFactory);
- process.addTask(restartDataSourceTask);
-
- //stop sort
+ //restart sort
ServiceTask restartSortTask = new ServiceTask();
restartSortTask.setName("restartSort");
restartSortTask.setDisplayName("Group-RestartSort");
@@ -81,13 +73,21 @@ public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
restartSortTask.addListenerProvider(serviceTaskListenerFactory);
process.addTask(restartSortTask);
+ //restart datasource
+ ServiceTask restartDataSourceTask = new ServiceTask();
+ restartDataSourceTask.setName("restartSource");
+ restartDataSourceTask.setDisplayName("Group-RestartSource");
+ restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+ restartDataSourceTask.addListenerProvider(serviceTaskListenerFactory);
+ process.addTask(restartDataSourceTask);
+
// End node
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
- startEvent.addNext(restartDataSourceTask);
- restartDataSourceTask.addNext(restartSortTask);
- restartSortTask.addNext(endEvent);
+ startEvent.addNext(restartSortTask);
+ restartSortTask.addNext(restartDataSourceTask);
+ restartDataSourceTask.addNext(endEvent);
return process;
}