You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/28 02:16:30 UTC
[incubator-inlong] branch master updated: [INLONG-3981][Manager] Fix state check of lightweight group (#3982)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6991707e5 [INLONG-3981][Manager] Fix state check of lightweight group (#3982)
6991707e5 is described below
commit 6991707e5a0a8f2c78dc8b9f726709598726f213
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Apr 28 10:16:24 2022 +0800
[INLONG-3981][Manager] Fix state check of lightweight group (#3982)
---
.../core/impl/InlongGroupProcessOperation.java | 4 ++--
.../manager/service/sort/util/ExtractNodeUtils.java | 5 ++++-
.../manager/service/sort/util/LoadNodeUtils.java | 21 +++++++++++++++++++--
.../service/sort/util/TransformNodeUtils.java | 2 +-
.../listener/light/LightGroupCompleteListener.java | 11 +++++++++++
.../listener/light/LightGroupFailedListener.java | 6 ++++++
.../light/LightGroupUpdateCompleteListener.java | 14 ++++++++++----
.../listener/light/LightGroupUpdateListener.java | 1 -
8 files changed, 53 insertions(+), 11 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
index 9d5aa3848..712a73ea8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperation.java
@@ -160,7 +160,7 @@ public class InlongGroupProcessOperation {
case LIGHT:
LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.RESTART);
executorService.execute(
- () -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, lightForm));
+ () -> workflowService.start(ProcessName.RESTART_LIGHT_GROUP_PROCESS, operator, lightForm));
break;
default:
throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
@@ -237,7 +237,7 @@ public class InlongGroupProcessOperation {
case LIGHT:
LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo,
GroupOperateType.DELETE);
- workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, lightForm);
+ workflowService.start(ProcessName.DELETE_LIGHT_GROUP_PROCESS, operator, lightForm);
break;
default:
throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 572777847..817381f34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -85,7 +85,10 @@ public class ExtractNodeUtils {
String userName = binlogSourceResponse.getUser();
String password = binlogSourceResponse.getPassword();
Integer port = binlogSourceResponse.getPort();
- Integer serverId = binlogSourceResponse.getServerId();
+ Integer serverId = null;
+ if (binlogSourceResponse.getServerId() != null && binlogSourceResponse.getServerId() > 0) {
+ serverId = binlogSourceResponse.getServerId();
+ }
String tables = binlogSourceResponse.getTableWhiteList();
List<String> tableNames = Splitter.on(",").splitToList(tables);
List<InlongStreamFieldInfo> streamFieldInfos = binlogSourceResponse.getFieldList();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index e56feafb9..928e701ce 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -72,8 +72,7 @@ public class LoadNodeUtils {
.map(sinkFieldResponse -> new FieldInfo(sinkFieldResponse.getFieldName(), name,
FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
- List<FieldRelationShip> fieldRelationShips = fieldInfos.stream()
- .map(fieldInfo -> new FieldRelationShip(fieldInfo, fieldInfo)).collect(Collectors.toList());
+ List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
Integer sinkParallelism = null;
@@ -114,4 +113,22 @@ public class LoadNodeUtils {
properties,
primaryKey);
}
+
+ public static List<FieldRelationShip> parseSinkFields(List<SinkFieldResponse> sinkFieldResponses, String sinkName) {
+ if (CollectionUtils.isEmpty(sinkFieldResponses)) {
+ return Lists.newArrayList();
+ }
+ return sinkFieldResponses.stream().map(sinkFieldResponse -> {
+ String fieldName = sinkFieldResponse.getFieldName();
+ String fieldType = sinkFieldResponse.getFieldType();
+ String fieldFormat = sinkFieldResponse.getFieldFormat();
+ FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
+ FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
+ String sourceFieldName = sinkFieldResponse.getSourceFieldName();
+ String sourceFieldType = sinkFieldResponse.getSourceFieldType();
+ FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
+ FieldInfoUtils.convertFieldFormat(sourceFieldType));
+ return new FieldRelationShip(sourceField, sinkField);
+ }).collect(Collectors.toList());
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index e913229cb..8952414b7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -93,7 +93,7 @@ public class TransformNodeUtils {
throw new UnsupportedOperationException(
String.format("Unsupported deduplication strategy=%s for inlong", deDuplicationStrategy));
}
- TransformNode transformNode = createTransformNode(transformResponse);
+ TransformNode transformNode = createNormalTransformNode(transformResponse);
return new DistinctNode(transformNode.getId(),
transformNode.getName(),
transformNode.getFields(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
index 93038e480..dacb14dfc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
@@ -19,8 +19,12 @@ package org.apache.inlong.manager.service.workflow.group.listener.light;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongGroupService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -34,6 +38,10 @@ public class LightGroupCompleteListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
+ @Autowired
+ private InlongStreamService streamService;
+ @Autowired
+ private StreamSourceService sourceService;
@Override
public ProcessEvent event() {
@@ -48,6 +56,9 @@ public class LightGroupCompleteListener implements ProcessEventListener {
// Update inlong group status and other info
groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
groupService.update(form.getGroupInfo().genRequest(), applicant);
+ // Update status of other related configs
+ streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
index dd0e547b0..9a585259e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
@@ -19,8 +19,10 @@ package org.apache.inlong.manager.service.workflow.group.listener.light;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongGroupService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -34,6 +36,8 @@ public class LightGroupFailedListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
+ @Autowired
+ private InlongStreamService streamService;
@Override
public ProcessEvent event() {
@@ -48,6 +52,8 @@ public class LightGroupFailedListener implements ProcessEventListener {
// Update inlong group status
groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), applicant);
groupService.update(form.getGroupInfo().genRequest(), applicant);
+ // Update inlong stream status
+ streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), applicant);
return ListenerResult.fail();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
index baeb2277c..bd47438b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
@@ -21,10 +21,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.LightGroupResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -40,9 +42,10 @@ public class LightGroupUpdateCompleteListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
-
@Autowired
private InlongStreamService streamService;
+ @Autowired
+ private StreamSourceService sourceService;
@Override
public ProcessEvent event() {
@@ -57,13 +60,16 @@ public class LightGroupUpdateCompleteListener implements ProcessEventListener {
GroupOperateType groupOperateType = form.getGroupOperateType();
switch (groupOperateType) {
case SUSPEND:
- groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), applicant);
+ groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), applicant);
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_FROZEN.getCode(), applicant);
break;
case RESTART:
- groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), applicant);
+ groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), applicant);
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
break;
case DELETE:
- groupService.updateStatus(groupId, GroupStatus.DELETING.getCode(), applicant);
+ groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), applicant);
+ sourceService.logicDeleteAll(groupId, null, applicant);
break;
default:
break;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
index da5788390..ee77a0558 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
@@ -40,7 +40,6 @@ public class LightGroupUpdateListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
-
@Autowired
private InlongStreamService streamService;