You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/12/23 02:40:11 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

caishunfeng commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774277894



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
##########
@@ -0,0 +1,159 @@
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+public class TaskResponsePersistThread implements Runnable {
+
+    /**
+     * logger of TaskResponsePersistThread
+     */
+    private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);
+
+    private ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();
+
+    private final Integer processInstanceId;
+
+    /**
+     * process service
+     */
+    private ProcessService processService;
+
+    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
+
+    public TaskResponsePersistThread(ProcessService processService,
+                                     ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper,
+                                     Integer processInstanceId) {
+        this.processService = processService;
+        this.processInstanceMapper = processInstanceMapper;
+        this.processInstanceId = processInstanceId;
+    }
+
+    @Override
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskResponseEvent event = this.events.peek();
+            try {
+                persist(event);
+            } catch (Exception e) {
+                logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e);
+            } finally {
+                this.events.remove(event);

Review comment:
       if exception happend, should the event be removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org