You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/12/23 06:44:47 UTC

[GitHub] [dolphinscheduler] zwZjut commented on a change in pull request #7560: [Improvement][MasterServer] event response handle parallel

zwZjut commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774355264



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
##########
@@ -0,0 +1,159 @@
+package org.apache.dolphinscheduler.server.master.processor.queue;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.StateEvent;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
+import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+public class TaskResponsePersistThread implements Runnable {
+
+    /**
+     * logger of TaskResponsePersistThread
+     */
+    private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);
+
+    private ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();
+
+    private final Integer processInstanceId;
+
+    /**
+     * process service
+     */
+    private ProcessService processService;
+
+    private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;
+
+    public TaskResponsePersistThread(ProcessService processService,
+                                     ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper,
+                                     Integer processInstanceId) {
+        this.processService = processService;
+        this.processInstanceMapper = processInstanceMapper;
+        this.processInstanceId = processInstanceId;
+    }
+
+    @Override
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskResponseEvent event = this.events.peek();
+            try {
+                persist(event);
+            } catch (Exception e) {
+                logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e);
+            } finally {
+                this.events.remove(event);

Review comment:
       the logic is the same  as before, worker will retry to send event to master




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org