You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/12 03:51:09 UTC

[incubator-inlong] branch master updated: [INLONG-4155][Manager] Change constants and remove clone method in WorkflowContext (#4160)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8804a300c [INLONG-4155][Manager] Change constants and remove clone method in WorkflowContext (#4160)
8804a300c is described below

commit 8804a300cef2a26ab6574a32391702023fe00b65
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu May 12 11:51:04 2022 +0800

    [INLONG-4155][Manager] Change constants and remove clone method in WorkflowContext (#4160)
    
    * Change constants in InlongGroupSettings
    
    * remove clone method in WorkflowContext
---
 .../common/settings/InlongGroupSettings.java       |  4 +--
 .../dao/mapper/StreamSourceEntityMapper.java       | 15 ++++++++++
 .../resources/mappers/StreamSinkEntityMapper.xml   |  1 +
 .../resources/mappers/StreamSourceEntityMapper.xml |  9 ++++++
 .../service/core/impl/AgentServiceImpl.java        |  4 +--
 .../service/sink/StreamSinkServiceImpl.java        |  3 +-
 .../service/source/StreamSourceServiceImpl.java    |  1 -
 .../source/listener/DataSourceListenerTest.java    |  2 --
 .../inlong/manager/workflow/WorkflowContext.java   | 35 ++--------------------
 .../manager/workflow/definition/ServiceTask.java   |  9 ++++--
 .../event/process/ProcessEventNotifier.java        | 10 +++----
 .../workflow/event/task/TaskEventNotifier.java     |  6 ++--
 12 files changed, 45 insertions(+), 54 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index eb1495b82..78841fb98 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -34,9 +34,9 @@ public class InlongGroupSettings {
     /**
      * config of pulsar
      */
-    public static final String PULSAR_ADMIN_URL = "pulsar.adminUrl";
+    public static final String PULSAR_ADMIN_URL = "pulsar_adminUrl";
 
-    public static final String PULSAR_SERVICE_URL = "pulsar.serviceUrl";
+    public static final String PULSAR_SERVICE_URL = "pulsar_serviceUrl";
 
     public static final String PULSAR_AUTHENTICATION = "pulsar.authentication";
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index d8f7d2e36..8f17751dd 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -33,6 +33,14 @@ public interface StreamSourceEntityMapper {
 
     StreamSourceEntity selectByIdForUpdate(Integer id);
 
+    /**
+     * Only used for agent collector, which will select all tasks related include deleted tasks.
+     *
+     * @param id
+     * @return
+     */
+    StreamSourceEntity selectForAgentTask(Integer id);
+
     /**
      * Query un-deleted sources by the given agentIp.
      */
@@ -109,6 +117,13 @@ public interface StreamSourceEntityMapper {
 
     int updateSnapshot(StreamSourceEntity entity);
 
+    /**
+     * Physical delete stream sources.
+     *
+     * @param groupId
+     * @param streamId
+     * @return
+     */
     int deleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index bddd60860..5430e7fb3 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -55,6 +55,7 @@
         <include refid="Base_Column_List"/>
         from stream_sink
         where id = #{id,jdbcType=INTEGER}
+        and is_deleted = 0
     </select>
     <select id="selectCount" resultType="java.lang.Integer">
         select count(1)
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 6e6c62ac1..7b860e5e6 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -74,8 +74,17 @@
         <include refid="Base_Column_List"/>
         from stream_source
         where id = #{id,jdbcType=INTEGER}
+        and is_deleted = 0
     </select>
     <select id="selectByIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        where id = #{id,jdbcType=INTEGER}
+        and is_deleted = 0
+        for update
+    </select>
+    <select id="selectForAgentTask" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 109eb8df5..8a0e2bdec 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -105,7 +105,7 @@ public class AgentServiceImpl implements AgentService {
 
     private void updateTaskStatus(CommandEntity command) {
         Integer taskId = command.getTaskId();
-        StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+        StreamSourceEntity current = sourceMapper.selectForAgentTask(taskId);
         if (current == null) {
             LOGGER.warn("stream source not found by id={}, just return", taskId);
             return;
@@ -189,7 +189,7 @@ public class AgentServiceImpl implements AgentService {
         for (StreamSourceEntity entity : entityList) {
             // Change 20x to 30x
             int id = entity.getId();
-            entity = sourceMapper.selectByIdForUpdate(id);
+            entity = sourceMapper.selectForAgentTask(id);
             int status = entity.getStatus();
             int op = status % MODULUS_100;
             if (status / MODULUS_100 == UNISSUED_STATUS) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 000e7cda6..656692dd8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -284,8 +284,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
                 entity.setIsDeleted(id);
                 entity.setModifier(operator);
                 entity.setModifyTime(now);
-
-                sinkMapper.deleteByPrimaryKey(id);
+                sinkMapper.updateByPrimaryKeySelective(entity);
                 sinkFieldMapper.logicDeleteAll(id);
             });
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b2472233b..27dba4dc7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -242,7 +242,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     public boolean logicDeleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-        Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
         InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index bf15aaf8e..4db4d2e6d 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -118,8 +118,6 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
         WorkflowProcess process = context.getProcess();
         WorkflowTask task = process.getTaskByName("restartSource");
         Assert.assertTrue(task instanceof ServiceTask);
-        SourceResponse sourceResponse = streamSourceService.get(sourceId);
-        Assert.assertSame(SourceStatus.forCode(sourceResponse.getStatus()), SourceStatus.SOURCE_NORMAL);
     }
 
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java
index 1d52bc293..ec064f4f6 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowContext.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.workflow;
 import com.google.common.collect.Lists;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.TaskForm;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
@@ -36,7 +35,7 @@ import java.util.List;
  */
 @Data
 @Slf4j
-public class WorkflowContext implements Cloneable {
+public class WorkflowContext {
 
     private String applicant;
 
@@ -110,28 +109,7 @@ public class WorkflowContext implements Cloneable {
         return newTaskList;
     }
 
-    public WorkflowContext setNewTaskList(List<WorkflowTaskEntity> newTaskList) {
-        this.newTaskList = newTaskList;
-        return this;
-    }
-
-    @Override
-    public WorkflowContext clone() {
-        try {
-            WorkflowContext workflowContext = (WorkflowContext) super.clone();
-            workflowContext.setProcess(process.clone());
-            workflowContext.setCurrentElement(currentElement.clone());
-            if (actionContext != null) {
-                workflowContext.setActionContext(actionContext.clone());
-            }
-            return workflowContext;
-        } catch (Exception e) {
-            log.error("workflow context clone failed", e);
-            throw new WorkflowException("workflow context clone failed " + this.getProcessEntity().getId());
-        }
-    }
-
-    public static class ActionContext implements Cloneable {
+    public static class ActionContext {
 
         private WorkflowAction action;
         private String operator;
@@ -203,15 +181,6 @@ public class WorkflowContext implements Cloneable {
             this.transferToUsers = transferToUsers;
             return this;
         }
-
-        @Override
-        protected ActionContext clone() throws CloneNotSupportedException {
-            ActionContext actionContext = (ActionContext) super.clone();
-            if (task != null) {
-                actionContext.setTask(task.clone());
-            }
-            return actionContext;
-        }
     }
 
 }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
index 163b63c8f..c66a8a7e3 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTask.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.workflow.WorkflowAction;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -121,8 +122,12 @@ public class ServiceTask extends WorkflowTask {
             if (listenerProvider == null || serviceTaskType == null) {
                 return;
             }
-            Iterable<TaskEventListener> listeners = listenerProvider.get(workflowContext, serviceTaskType);
-            addListeners(Lists.newArrayList(listeners));
+            List<TaskEventListener> listeners = Lists.newArrayList(
+                    listenerProvider.get(workflowContext, serviceTaskType));
+            log.info("ServiceTask:{} is init for listeners:{}", getName(),
+                    JsonUtils.toJson(listeners.stream().map(listener -> listener.name()).collect(
+                            Collectors.toList())));
+            addListeners(listeners);
         } else {
             log.info("ServiceTask:{} is already init", getName());
         }
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
index 9e5adc9b9..e4050c8b1 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/process/ProcessEventNotifier.java
@@ -58,8 +58,7 @@ public class ProcessEventNotifier implements EventListenerNotifier<ProcessEvent>
     }
 
     @Override
-    public void notify(ProcessEvent event, WorkflowContext sourceContext) {
-        final WorkflowContext context = sourceContext.clone();
+    public void notify(ProcessEvent event, WorkflowContext context) {
         WorkflowProcess process = context.getProcess();
 
         eventListenerManager.syncListeners(event).forEach(syncLogableNotify(context));
@@ -71,12 +70,11 @@ public class ProcessEventNotifier implements EventListenerNotifier<ProcessEvent>
 
     @Override
     public void notify(String listenerName, boolean forceSync, WorkflowContext sourceContext) {
-        final WorkflowContext context = sourceContext.clone();
-        WorkflowProcess process = context.getProcess();
+        WorkflowProcess process = sourceContext.getProcess();
 
         Optional.ofNullable(this.eventListenerManager.listener(listenerName))
-                .ifPresent(logableNotify(forceSync, context));
-        Optional.ofNullable(process.listener(listenerName)).ifPresent(logableNotify(forceSync, context));
+                .ifPresent(logableNotify(forceSync, sourceContext));
+        Optional.ofNullable(process.listener(listenerName)).ifPresent(logableNotify(forceSync, sourceContext));
     }
 
     private Consumer<ProcessEventListener> logableNotify(boolean forceSync, WorkflowContext context) {
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
index 45ba6356d..6a3144f2c 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/event/task/TaskEventNotifier.java
@@ -58,8 +58,7 @@ public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
     }
 
     @Override
-    public void notify(TaskEvent event, WorkflowContext sourceContext) {
-        final WorkflowContext context = sourceContext.clone();
+    public void notify(TaskEvent event, WorkflowContext context) {
         WorkflowTask task = (WorkflowTask) context.getCurrentElement();
         eventListenerManager.syncListeners(event).forEach(syncLogableNotify(context));
 
@@ -71,8 +70,7 @@ public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
     }
 
     @Override
-    public void notify(String listenerName, boolean forceSync, WorkflowContext sourceContext) {
-        final WorkflowContext context = sourceContext.clone();
+    public void notify(String listenerName, boolean forceSync, WorkflowContext context) {
         Optional.ofNullable(this.eventListenerManager.listener(listenerName))
                 .ifPresent(logableNotify(forceSync, context));