You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/12/22 19:55:42 UTC

[GitHub] [dolphinscheduler] zwZjut opened a new pull request #7560: #7450

zwZjut opened a new pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560


   <!--Thanks very much for contributing to Apache DolphinScheduler. Please review https://dolphinscheduler.apache.org/en-us/community/development/pull-request.html before opening a pull request.-->
   
   
   ## Purpose of the pull request
   
   <!--(For example: This pull request adds checkstyle plugin).-->
   
   ## Brief change log
   
   <!--*(for example:)*
     - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   ## Verify this pull request
   
   <!--*(Please pick either of the following options)*-->
   
   This pull request is code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
     - *Added dolphinscheduler-dao tests for end-to-end.*
     - *Added CronUtilsTest to verify the change.*
     - *Manually verified the change by testing locally.* -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774277894



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
##########
@@ -0,0 +1,159 @@
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+public class TaskResponsePersistThread implements Runnable {
+
+    /**
+     * logger of TaskResponsePersistThread
+     */
+    private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);
+
+    private ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();
+
+    private final Integer processInstanceId;
+
+    /**
+     * process service
+     */
+    private ProcessService processService;
+
+    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
+
+    public TaskResponsePersistThread(ProcessService processService,
+                                     ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper,
+                                     Integer processInstanceId) {
+        this.processService = processService;
+        this.processInstanceMapper = processInstanceMapper;
+        this.processInstanceId = processInstanceId;
+    }
+
+    @Override
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskResponseEvent event = this.events.peek();
+            try {
+                persist(event);
+            } catch (Exception e) {
+                logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e);
+            } finally {
+                this.events.remove(event);

Review comment:
       if exception happend, should the event be removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] zwZjut commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
zwZjut commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774588729



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
##########
@@ -0,0 +1,172 @@
+package org.apache.dolphinscheduler.server.master.processor.queue;

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] geosmart commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
geosmart commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774823110



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
##########
@@ -142,84 +175,72 @@ public void run() {
     }
 
     /**
-     * persist  taskResponseEvent
-     *
-     * @param taskResponseEvent taskResponseEvent
+     * event handler thread
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
-        Event event = taskResponseEvent.getEvent();
-        Channel channel = taskResponseEvent.getChannel();
+    class EventHandler extends Thread {

Review comment:
       name can be more concrete like `TaskResponseEventHandler`
   
   and the comment `event handler thread` say nothing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] geosmart commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
geosmart commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774823110



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
##########
@@ -142,84 +175,72 @@ public void run() {
     }
 
     /**
-     * persist  taskResponseEvent
-     *
-     * @param taskResponseEvent taskResponseEvent
+     * event handler thread
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
-        Event event = taskResponseEvent.getEvent();
-        Channel channel = taskResponseEvent.getChannel();
+    class EventHandler extends Thread {

Review comment:
       name can be more concrete like `TaskResponseEventHandler`
   
   and the comment `event handler thread` say nothing, just remove it or describe what the thread do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] sonarcloud[bot] removed a comment on pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#issuecomment-1000366417


   SonarCloud Quality Gate failed.&nbsp; &nbsp; ![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG) [24 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT) [3 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL) [321 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL)
   
   [![16.8%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '16.8%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_coverage&view=list) [16.8% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_coverage&view=list)  
   [![0.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_duplicated_lines_density&view=list) [0.9% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] caishunfeng merged pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
caishunfeng merged pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] codecov-commenter commented on pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#issuecomment-1000609217


   # [Codecov](https://codecov.io/gh/apache/dolphinscheduler/pull/7560?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > :exclamation: No coverage uploaded for pull request base (`2.0.2-prepare@1693405`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/dolphinscheduler/pull/7560/graphs/tree.svg?width=650&height=150&src=pr&token=bv9iXXRLi9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/dolphinscheduler/pull/7560?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@               Coverage Diff                @@
   ##             2.0.2-prepare    #7560   +/-   ##
   ================================================
     Coverage                 ?   31.46%           
     Complexity               ?     1561           
   ================================================
     Files                    ?      434           
     Lines                    ?    14810           
     Branches                 ?     1478           
   ================================================
     Hits                     ?     4660           
     Misses                   ?     9697           
     Partials                 ?      453           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/dolphinscheduler/pull/7560?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/dolphinscheduler/pull/7560?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [1693405...720c221](https://codecov.io/gh/apache/dolphinscheduler/pull/7560?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#issuecomment-1000610832


   SonarCloud Quality Gate failed.&nbsp; &nbsp; ![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG) [24 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT) [3 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL) [321 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL)
   
   [![16.8%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '16.8%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_coverage&view=list) [16.8% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_coverage&view=list)  
   [![0.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_duplicated_lines_density&view=list) [0.9% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774405009



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
##########
@@ -142,80 +174,62 @@ public void run() {
     }
 
     /**
-     * persist  taskResponseEvent
-     *
-     * @param taskResponseEvent taskResponseEvent
+     * event handler thread
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
-        Event event = taskResponseEvent.getEvent();
-        Channel channel = taskResponseEvent.getChannel();
+    class EventHandler extends Thread {
 
-        TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-        switch (event) {
-            case ACK:
+        @Override
+        public void run() {
+            logger.info("event handler thread started");
+            while (Stopper.isRunning()) {
                 try {
-                    if (taskInstance != null) {
-                        ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
-                        boolean result = processService.changeTaskState(taskInstance, status,
-                                taskResponseEvent.getStartTime(),
-                                taskResponseEvent.getWorkerAddress(),
-                                taskResponseEvent.getExecutePath(),
-                                taskResponseEvent.getLogPath(),
-                                taskResponseEvent.getTaskInstanceId());
-                        logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
-                                result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
-                    }
-                    // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
-                    logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
+                    eventHandler();
+
+                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
                 } catch (Exception e) {
-                    logger.error("worker ack master error", e);
-                    DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
+                    logger.error("event handler thread error", e);
                 }
-                break;
-            case RESULT:
-                try {
-                    boolean result = true;
-                    if (taskInstance != null) {
-                        result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
-                                taskResponseEvent.getEndTime(),
-                                taskResponseEvent.getProcessId(),
-                                taskResponseEvent.getAppIds(),
-                                taskResponseEvent.getTaskInstanceId(),
-                                taskResponseEvent.getVarPool()
-                        );
-                        logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
-                                result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
+            }
+        }
+
+        private void eventHandler() {
+
+            Iterator<Map.Entry<Integer, TaskResponsePersistThread>> iter = processTaskResponseMapper.entrySet().iterator();
+
+            while (iter.hasNext()) {
+                Map.Entry<Integer, TaskResponsePersistThread> entry = iter.next();
+                int processInstanceId = entry.getKey();
+                TaskResponsePersistThread taskResponsePersistThread = entry.getValue();
+                if (taskResponsePersistThread.isEmpty()) {
+                    continue;

Review comment:
       Maybe it's better to add the remove logic here if process instance was finished and no task response event.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
caishunfeng commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774585223



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
##########
@@ -0,0 +1,172 @@
+package org.apache.dolphinscheduler.server.master.processor.queue;

Review comment:
       add license header




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] sonarcloud[bot] commented on pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#issuecomment-1000366417


   SonarCloud Quality Gate failed.&nbsp; &nbsp; ![Quality Gate failed](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/QualityGateBadge/failed-16px.png 'Quality Gate failed')
   
   [![Bug](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug-16px.png 'Bug')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG) [24 Bugs](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=BUG)  
   [![Vulnerability](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability-16px.png 'Vulnerability')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=VULNERABILITY)  
   [![Security Hotspot](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot-16px.png 'Security Hotspot')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT) [![E](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/E-16px.png 'E')](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT) [3 Security Hotspots](https://sonarcloud.io/project/security_hotspots?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=SECURITY_HOTSPOT)  
   [![Code Smell](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell-16px.png 'Code Smell')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL) [![A](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A-16px.png 'A')](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL) [321 Code Smells](https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&pullRequest=7560&resolved=false&types=CODE_SMELL)
   
   [![16.8%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/0-16px.png '16.8%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_coverage&view=list) [16.8% Coverage](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_coverage&view=list)  
   [![0.9%](https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3-16px.png '0.9%')](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_duplicated_lines_density&view=list) [0.9% Duplication](https://sonarcloud.io/component_measures?id=apache-dolphinscheduler&pullRequest=7560&metric=new_duplicated_lines_density&view=list)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] zwZjut commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
zwZjut commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774355264



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
##########
@@ -0,0 +1,159 @@
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+public class TaskResponsePersistThread implements Runnable {
+
+    /**
+     * logger of TaskResponsePersistThread
+     */
+    private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);
+
+    private ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();
+
+    private final Integer processInstanceId;
+
+    /**
+     * process service
+     */
+    private ProcessService processService;
+
+    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
+
+    public TaskResponsePersistThread(ProcessService processService,
+                                     ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper,
+                                     Integer processInstanceId) {
+        this.processService = processService;
+        this.processInstanceMapper = processInstanceMapper;
+        this.processInstanceId = processInstanceId;
+    }
+
+    @Override
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskResponseEvent event = this.events.peek();
+            try {
+                persist(event);
+            } catch (Exception e) {
+                logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e);
+            } finally {
+                this.events.remove(event);

Review comment:
       the logic is the same  as before, worker will retry to send event to master




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [dolphinscheduler] geosmart commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

Posted by GitBox <gi...@apache.org>.
geosmart commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774822619



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
##########
@@ -66,13 +66,32 @@
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     /**
      * task response worker
      */
     private Thread taskResponseWorker;
 
+    /**
+     * event handler
+     */
+    private Thread eventHandler;
+
     private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
 
+    private final ConcurrentHashMap<String, TaskResponsePersistThread> eventHandlerMap = new ConcurrentHashMap<>();
+
+    private ListeningExecutorService listeningExecutorService;
+
+    private ExecutorService eventExecService;
+
+    /**
+     * task response mapper
+     */
+    private final ConcurrentHashMap<Integer, TaskResponsePersistThread> processTaskResponseMapper = new ConcurrentHashMap<>();

Review comment:
       the name make me  think the map was a DAO object.
   
   maybe change it to `processTaskResponseMap` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org