You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/24 06:28:45 UTC

[incubator-inlong] branch master updated: [INLONG-777] InLong Manager new data stream error (#591)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5057170  [INLONG-777] InLong Manager new data stream error (#591)
5057170 is described below

commit 505717024cd6acd1416ec0536a084fa0098cd8da
Author: healchow <he...@gmail.com>
AuthorDate: Sat Jul 24 14:28:38 2021 +0800

    [INLONG-777] InLong Manager new data stream error (#591)
    
    Co-authored-by: healzhou <he...@tencent.com>
---
 inlong-manager/doc/sql/apache_inlong_manager.sql   | 20 ++++++---
 .../core/impl/WorkflowApproverServiceImpl.java     | 16 +++----
 .../service/workflow/WorkflowDefinition.java       |  4 +-
 .../service/workflow/WorkflowServiceImpl.java      |  4 +-
 .../CreateResourceWorkflowDefinition.java          |  8 ++--
 .../newbusiness/NewBusinessWorkflowDefinition.java | 19 ++++----
 .../NewConsumptionWorkflowDefinition.java          | 10 ++---
 .../newstream/SingleStreamWorkflowDefinition.java  |  8 ++--
 .../workflow/core/processor/UserTaskProcessor.java | 32 ++++++--------
 .../workflow/model/definition/ApproverAssign.java  |  8 ++--
 .../model/definition/FixedApproverAssign.java      | 50 ----------------------
 .../workflow/model/definition/UserTask.java        |  2 +
 12 files changed, 68 insertions(+), 113 deletions(-)

diff --git a/inlong-manager/doc/sql/apache_inlong_manager.sql b/inlong-manager/doc/sql/apache_inlong_manager.sql
index 4e8fadb..7226f0c 100644
--- a/inlong-manager/doc/sql/apache_inlong_manager.sql
+++ b/inlong-manager/doc/sql/apache_inlong_manager.sql
@@ -275,8 +275,9 @@ CREATE TABLE `data_schema`
   AUTO_INCREMENT = 10
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data format table';
 
-INSERT INTO `data_schema` (id, name, agent_type, data_generate_rule, sort_type, time_offset)
-values (1, 'm0_day', 'file_agent', 'day', 0, '-0d');
+-- create default data schema
+INSERT INTO `data_schema` (name, agent_type, data_generate_rule, sort_type, time_offset)
+values ('m0_day', 'file_agent', 'day', 0, '-0d');
 
 -- ----------------------------
 -- Table structure for data_source_cmd_config
@@ -710,10 +711,9 @@ CREATE TABLE `user`
   DEFAULT CHARSET = utf8mb4 COMMENT ='User table';
 
 -- create default admin user, username is 'admin', password is 'inlong'
-INSERT INTO `user` (id, name, password, account_type, due_date, create_time, update_time, create_by, update_by)
-values (1, 'admin', '628ed559bff5ae36bd2184d4216973cf', 0,
-        '2099-12-31 23:59:59', '2021-07-01 10:10:10',
-        '2021-07-01 10:10:10', 'inlong_init', NULL);
+INSERT INTO `user` (name, password, account_type, due_date, create_time, update_time, create_by, update_by)
+VALUES ('admin', '628ed559bff5ae36bd2184d4216973cf', 0, '2099-12-31 23:59:59',
+        CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 'inlong_init', 'inlong_init');
 
 -- ----------------------------
 -- Table structure for user_role
@@ -757,6 +757,14 @@ CREATE TABLE `wf_approver`
   AUTO_INCREMENT = 5
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow approver table';
 
+-- create default approver for new consumption and new business
+INSERT INTO `wf_approver`(`process_name`, `task_name`, `filter_key`, `filter_value`, `approvers`,
+                          `creator`, `modifier`, `create_time`, `modify_time`, `is_deleted`)
+VALUES ('NEW_CONSUMPTION_WORKFLOW', 'ut_admin', 'DEFAULT', NULL, 'admin',
+        'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0),
+       ('NEW_BUSINESS_WORKFLOW', 'ut_admin', 'DEFAULT', NULL, 'admin',
+        'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0);
+
 -- ----------------------------
 -- Table structure for wf_event_log
 -- ----------------------------
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
index 34b3637..994119d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
@@ -50,24 +50,20 @@ public class WorkflowApproverServiceImpl implements WorkflowApproverService {
 
     @Autowired
     private WorkflowApproverEntityMapper workflowApproverMapper;
-
     @Autowired
     private WorkflowEngine workflowEngine;
 
-
     @Override
     public List<String> getApprovers(String processName, String taskName, WorkflowApproverFilterContext context) {
-        List<WorkflowApproverEntity> configs = workflowApproverMapper
-                .selectByQuery(WorkflowApproverQuery.builder()
-                        .processName(processName)
-                        .taskName(taskName)
-                        .build());
-
-        Map<FilterKey, String> filterKey2ValueMap = context.toFilterKeyMap();
-
+        WorkflowApproverQuery approverQuery = WorkflowApproverQuery.builder()
+                .processName(processName)
+                .taskName(taskName)
+                .build();
+        List<WorkflowApproverEntity> configs = workflowApproverMapper.selectByQuery(approverQuery);
         Map<String, List<WorkflowApproverEntity>> groupByFilterKey = configs.stream()
                 .collect(Collectors.groupingBy(WorkflowApproverEntity::getFilterKey));
 
+        Map<FilterKey, String> filterKey2ValueMap = context.toFilterKeyMap();
         return FilterKey.getFilterKeyByOrder()
                 .stream()
                 .map(FilterKey::name)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
index ccee06f..2de9dd4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
@@ -29,13 +29,13 @@ public interface WorkflowDefinition {
      *
      * @return defined process
      */
-    Process define();
+    Process defineProcess();
 
     /**
      * Get process name
      *
      * @return process name
      */
-    ProcessName getName();
+    ProcessName getProcessName();
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index 8e4ab72..debe025 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -79,8 +79,8 @@ public class WorkflowServiceImpl implements WorkflowService {
     private void init() {
         log.info("start init workflow service");
         workflowDefinitions.forEach(definition -> {
-            workflowEngine.processDefinitionService().register(definition.define());
-            log.info("success register workflow definition: {}", definition.getName());
+            workflowEngine.processDefinitionService().register(definition.defineProcess());
+            log.info("success register workflow definition: {}", definition.getProcessName());
         });
         log.info("success init workflow service");
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
index 9c81be9..7646fd1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
@@ -77,7 +77,7 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
     private DataStreamEntityMapper streamMapper;
 
     @Override
-    public Process define() {
+    public Process defineProcess() {
 
         // Configuration process
         Process process = new Process();
@@ -86,8 +86,8 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
         process.addListener(createResourceFailedProcessListener);
 
         process.setType("Business Resource Creation");
-        process.setName(getName().name());
-        process.setDisplayName(getName().getDisplayName());
+        process.setName(getProcessName().name());
+        process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(CreateResourceWorkflowForm.class);
         process.setVersion(1);
         process.setHidden(true);
@@ -170,7 +170,7 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
     }
 
     @Override
-    public ProcessName getName() {
+    public ProcessName getProcessName() {
         return ProcessName.CREATE_BUSINESS_RESOURCE;
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
index c403c67..69372f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
@@ -51,12 +51,12 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
     private WorkflowApproverService workflowApproverService;
 
     @Override
-    public Process define() {
+    public Process defineProcess() {
         // Configuration process
         Process process = new Process();
-        process.setType(getName().getDisplayName());
-        process.setName(getName().name());
-        process.setDisplayName(getName().getDisplayName());
+        process.setType(getProcessName().getDisplayName());
+        process.setName(getProcessName().name());
+        process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(NewBusinessWorkflowForm.class);
         process.setVersion(1);
 
@@ -87,20 +87,23 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
         // Configuration order relationship
         startEvent.addNext(adminUserTask);
         // If you need another approval process, you can add it here
-
         adminUserTask.addNext(endEvent);
 
         return process;
     }
 
     @Override
-    public ProcessName getName() {
+    public ProcessName getProcessName() {
         return ProcessName.NEW_BUSINESS_WORKFLOW;
     }
 
+    /**
+     * Get task approvers by task name
+     */
     private List<String> getTaskApprovers(String taskName) {
-        return workflowApproverService.getApprovers(getName().name(), taskName,
-                WorkflowApproverFilterContext.builder().build());
+        String processName = this.getProcessName().name();
+        WorkflowApproverFilterContext context = new WorkflowApproverFilterContext();
+        return workflowApproverService.getApprovers(processName, taskName, context);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
index fe809ae..0abc568 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
@@ -71,13 +71,13 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     private BusinessService businessService;
 
     @Override
-    public Process define() {
+    public Process defineProcess() {
 
         // Define process information
         Process process = new Process();
         process.setType("Data Consumption Resource Creation");
-        process.setName(getName().name());
-        process.setDisplayName(getName().getDisplayName());
+        process.setName(getProcessName().name());
+        process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(NewConsumptionWorkflowForm.class);
         process.setVersion(1);
         process.setProcessDetailHandler(newConsumptionProcessDetailHandler);
@@ -123,7 +123,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
         NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
         ConsumptionInfo consumptionInfo = Optional.ofNullable(form.getConsumptionInfo())
                 .orElseGet(ConsumptionInfo::new);
-        return workflowApproverService.getApprovers(getName().name(), UT_ADMINT_NAME,
+        return workflowApproverService.getApprovers(getProcessName().name(), UT_ADMINT_NAME,
                 new WorkflowApproverFilterContext());
     }
 
@@ -139,7 +139,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     }
 
     @Override
-    public ProcessName getName() {
+    public ProcessName getProcessName() {
         return ProcessName.NEW_CONSUMPTION_WORKFLOW;
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
index 410624f..a1f1fcb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
@@ -57,7 +57,7 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
     private PushHiveConfigToSortEventListener pushHiveConfigToSortEventListener;
 
     @Override
-    public Process define() {
+    public Process defineProcess() {
         // Configuration process
         Process process = new Process();
         process.addListener(initBusinessInfoListener);
@@ -65,8 +65,8 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
         process.addListener(singleStreamCompleteProcessListener);
 
         process.setType("Data stream access resource creation");
-        process.setName(getName().name());
-        process.setDisplayName(getName().getDisplayName());
+        process.setName(getProcessName().name());
+        process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(CreateResourceWorkflowForm.class);
         process.setVersion(1);
         process.setHidden(true);
@@ -112,7 +112,7 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
     }
 
     @Override
-    public ProcessName getName() {
+    public ProcessName getProcessName() {
         return ProcessName.CREATE_DATASTREAM_RESOURCE;
     }
 }
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
index 409c9a1..b83c99b 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
@@ -20,7 +20,16 @@ package org.apache.inlong.manager.workflow.core.processor;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.workflow.core.WorkflowDataAccessor;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventNotifier;
@@ -33,18 +42,6 @@ import org.apache.inlong.manager.workflow.model.definition.Element;
 import org.apache.inlong.manager.workflow.model.definition.UserTask;
 import org.apache.inlong.manager.workflow.model.instance.ProcessInstance;
 import org.apache.inlong.manager.workflow.model.instance.TaskInstance;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
 
 /**
  * User task processor
@@ -57,7 +54,7 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
     private static final Set<Action> SUPPORT_ACTIONS = ImmutableSet.of(
             Action.APPROVE, Action.REJECT, Action.TRANSFER, Action.CANCEL, Action.TERMINATE
     );
-    private TaskEventNotifier taskEventNotifier;
+    private final TaskEventNotifier taskEventNotifier;
 
     public UserTaskProcessor(WorkflowDataAccessor workflowDataAccessor, WorkflowEventNotifier workflowEventNotifier) {
         super(workflowDataAccessor);
@@ -71,15 +68,15 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
 
     @Override
     public void create(UserTask userTask, WorkflowContext context) {
-        ProcessInstance processInstance = context.getProcessInstance();
-
         List<String> approvers = userTask.getApproverAssign().assign(context);
-        Preconditions.checkNotEmpty(approvers, "can't assign approvers for task :" + userTask.getDisplayName());
+        Preconditions.checkNotEmpty(approvers, "can't assign approvers for task: " + userTask.getDisplayName()
+                + ", as the approvers in table `wf_approver` was empty");
 
         if (!userTask.isNeedAllApprove()) {
             approvers = Collections.singletonList(StringUtils.join(approvers, TaskInstance.APPROVERS_DELIMITER));
         }
 
+        ProcessInstance processInstance = context.getProcessInstance();
         approvers.stream()
                 .map(approver -> createTaskInstance(userTask, processInstance, approver))
                 .forEach(context.getNewTaskInstances()::add);
@@ -105,7 +102,6 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
         checkOperator(actionContext);
         completeTaskInstance(actionContext);
 
-
         this.taskEventNotifier.notify(toTaskEvent(actionContext.getAction()), context);
         return true;
     }
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/ApproverAssign.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/ApproverAssign.java
index d67d2de..eb893d8 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/ApproverAssign.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/ApproverAssign.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.manager.workflow.model.definition;
 
-import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
 import java.util.Collections;
 import java.util.List;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
 
 /**
  * Approver assignment
@@ -31,8 +30,8 @@ public interface ApproverAssign {
     /**
      * The default system automatic approval
      */
-    ApproverAssign DEFAULT_SYSTEM_APPROVER = c -> Collections.singletonList("SYSTEM");
-    ApproverAssign DEFAULT_SKIP_APPROVER = c -> Collections.singletonList("SKIP");
+    ApproverAssign DEFAULT_SYSTEM_APPROVER = context -> Collections.singletonList("SYSTEM");
+    ApproverAssign DEFAULT_SKIP_APPROVER = context -> Collections.singletonList("SKIP");
 
     /**
      * Designated approver
@@ -40,4 +39,5 @@ public interface ApproverAssign {
      * @return List of Approvers
      */
     List<String> assign(WorkflowContext context);
+
 }
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/FixedApproverAssign.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/FixedApproverAssign.java
deleted file mode 100644
index bcbd2ab..0000000
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/FixedApproverAssign.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow.model.definition;
-
-import com.google.common.collect.Lists;
-
-import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.List;
-
-/**
- * Fixed approver
- */
-public class FixedApproverAssign implements ApproverAssign {
-
-    private List<String> approvers;
-
-    public FixedApproverAssign(List<String> approvers) {
-        this.approvers = approvers;
-    }
-
-    public static ApproverAssign of(List<String> approvers) {
-        return new FixedApproverAssign(approvers);
-    }
-
-    public static ApproverAssign of(String... approvers) {
-        return new FixedApproverAssign(Lists.newArrayList(approvers));
-    }
-
-    @Override
-    public List<String> assign(WorkflowContext context) {
-        return approvers;
-    }
-
-}
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/UserTask.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/UserTask.java
index a858d95..dd950cd 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/UserTask.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/model/definition/UserTask.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.workflow.exception.WorkflowException;
@@ -32,6 +33,7 @@ import org.apache.inlong.manager.workflow.model.WorkflowContext;
  * User task
  */
 @Data
+@EqualsAndHashCode(callSuper = true)
 @NoArgsConstructor
 public class UserTask extends Task {