You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2023/10/11 02:09:11 UTC

[dolphinscheduler] branch revert-14986-fix/duplicate_event_319 created (now aac45af68d)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a change to branch revert-14986-fix/duplicate_event_319
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


      at aac45af68d Revert "fix duplicate event (#14986)"

This branch includes the following new commits:

     new aac45af68d Revert "fix duplicate event (#14986)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[dolphinscheduler] 01/01: Revert "fix duplicate event (#14986)"

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch revert-14986-fix/duplicate_event_319
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit aac45af68d49dedd10bfa010dbaa0854a4dd7322
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Wed Oct 11 10:09:05 2023 +0800

    Revert "fix duplicate event (#14986)"
    
    This reverts commit 7a38b87c9a9097465ff5966ebc43cf55f4bdf75c.
---
 .../master/runner/StateWheelExecuteThread.java     | 20 ++++-------
 .../master/runner/WorkflowExecuteRunnable.java     | 41 ++++++----------------
 .../master/runner/WorkflowExecuteThreadPool.java   | 25 +++----------
 3 files changed, 23 insertions(+), 63 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 5f526115d5..04d4dc5621 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -17,11 +17,12 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -34,20 +35,16 @@ import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
 import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-
-import java.util.Optional;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import javax.annotation.PostConstruct;
-
-import lombok.NonNull;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Lazy;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PostConstruct;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * Check thread
  * 1. timeout task check
@@ -404,10 +401,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
                 .type(StateEventType.TASK_STATE_CHANGE)
                 .status(TaskExecutionStatus.RUNNING_EXECUTION)
                 .build();
-        // will skip submit check event if existed, avoid event stacking
-        if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) {
-            workflowExecuteThreadPool.submitStateEvent(stateEvent);
-        }
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
     }
 
     private void addProcessStopEvent(ProcessInstance processInstance) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 50bfc2099c..8bbd0bb064 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -88,7 +88,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 import org.apache.dolphinscheduler.service.utils.DagHelper;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
@@ -292,19 +291,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             } catch (StateEventHandleError stateEventHandleError) {
                 logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
                 this.stateEvents.remove(stateEvent);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (StateEventHandleException stateEventHandleException) {
                 logger.error("State event handle error, will retry this event: {}",
                         stateEvent,
                         stateEventHandleException);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
                 // we catch the exception here, since if the state event handle failed, the state event will still keep
                 // in the stateEvents queue.
                 logger.error("State event handle error, get a unknown exception, will retry this event: {}",
                         stateEvent,
                         e);
-                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
             } finally {
                 LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
             }
@@ -323,18 +322,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         return key;
     }
 
-    public boolean existStateEvent(StateEvent stateEvent) {
-        if (CollectionUtils.isNotEmpty(this.stateEvents)) {
-            Optional<StateEvent> optional = this.stateEvents.stream()
-                    .filter(e -> e.getProcessInstanceId() == stateEvent.getProcessInstanceId()
-                            && Objects.equals(e.getTaskInstanceId(), stateEvent.getTaskInstanceId())
-                            && e.getType() == stateEvent.getType())
-                    .findFirst();
-            return optional.isPresent();
-        }
-        return false;
-    }
-
     public boolean addStateEvent(StateEvent stateEvent) {
         if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
             logger.info("state event would be abounded :{}", stateEvent);
@@ -619,8 +606,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         Date scheduleDate = processInstance.getScheduleTime();
         if (scheduleDate == null) {
             if (CollectionUtils.isEmpty(complementListDate)) {
-                logger.info("complementListDate is empty, process complement end. process id:{}",
-                        processInstance.getId());
+                logger.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId());
 
                 return true;
             }
@@ -845,8 +831,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                             task.getTaskCode(),
                             task.getState());
                     if (validTaskMap.containsKey(task.getTaskCode())) {
-                        logger.warn(
-                                "Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
+                        logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
                                 task.getTaskCode());
                         int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
                         TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
@@ -995,8 +980,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                         taskInstance.getProcessInstanceId(),
                         taskInstance.getTaskGroupPriority());
                 if (!acquireTaskGroup) {
-                    logger.info(
-                            "Submitted task will not be dispatch right now because the first time to try to acquire" +
+                    logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" +
                                     " task group failed, taskInstanceName: {}, taskGroupId: {}",
                             taskInstance.getName(), taskGroupId);
                     return Optional.of(taskInstance);
@@ -1005,8 +989,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
 
             boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
             if (!dispatchSuccess) {
-                logger.error("Dispatch standby process {} task {} failed", processInstance.getName(),
-                        taskInstance.getName());
+                logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName());
                 return Optional.empty();
             }
             taskProcessor.action(TaskAction.RUN);
@@ -1463,10 +1446,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             List<String> nextTaskList =
                     DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
             if (!nextTaskList.contains(nextNodeName)) {
-                logger.info(
-                        "DependTask is a condition task, and its next condition branch does not hava current task, " +
-                                "dependTaskCode: {}, currentTaskCode: {}",
-                        dependNodeName, nextNodeName);
+                logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " +
+                                "dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName
+                        );
                 return false;
             }
         } else {
@@ -1842,8 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
                 if (retryTask != null && retryTask.getState().isForceSuccess()) {
                     task.setState(retryTask.getState());
-                    logger.info(
-                            "Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
+                    logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
                             task.getName(), task.getId());
                     removeTaskFromStandbyList(task);
                     completeTaskMap.put(task.getTaskCode(), task.getId());
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 510e27297a..e26df60244 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
+import com.google.common.base.Strings;
+import lombok.NonNull;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
@@ -32,14 +34,6 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
 import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.utils.LoggerUtils;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-
-import lombok.NonNull;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -48,7 +42,9 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.util.concurrent.ListenableFutureCallback;
 
-import com.google.common.base.Strings;
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Used to execute {@link WorkflowExecuteRunnable}.
@@ -86,17 +82,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
         this.setCorePoolSize(masterConfig.getExecThreads());
     }
 
-    public boolean existStateEvent(StateEvent stateEvent) {
-        WorkflowExecuteRunnable workflowExecuteThread =
-                processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
-        if (workflowExecuteThread == null) {
-            logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}",
-                    stateEvent);
-            return false;
-        }
-        return workflowExecuteThread.existStateEvent(stateEvent);
-    }
-
     /**
      * submit state event
      */