You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/05/17 08:36:31 UTC

[dolphinscheduler] branch dev updated: [Bug][Master] fix master task failover (#10065)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0cc0ee77fa [Bug][Master] fix master task failover (#10065)
0cc0ee77fa is described below

commit 0cc0ee77faabf1bed495a55ccd7653c03c26a550
Author: caishunfeng <ca...@gmail.com>
AuthorDate: Tue May 17 16:36:24 2022 +0800

    [Bug][Master] fix master task failover (#10065)
    
    * fix master task failover
    
    * ui
---
 .../master/runner/task/TaskProcessorFactory.java   |   9 ++
 .../server/master/service/FailoverService.java     |  63 ++++++++-----
 .../server/master/service/FailoverServiceTest.java | 103 ++++++++++++++-------
 3 files changed, 114 insertions(+), 61 deletions(-)

diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
index 2b9e9d644a..0129338649 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
@@ -52,4 +52,13 @@ public class TaskProcessorFactory {
 
         return iTaskProcessor.getClass().newInstance();
     }
+
+    /**
+     * if match master processor, then this task type is processed on the master
+     * @param type
+     * @return
+     */
+    public static boolean isMasterTask(String type) {
+        return PROCESS_MAP.containsKey(type);
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
index 4f25c78dad..2557b659e0 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -127,7 +128,11 @@ public class FailoverService {
         long startTime = System.currentTimeMillis();
         List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
         LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
-        List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
+
+        // servers need to contains master hosts and worker hosts, otherwise the logic task will failover fail.
+        List<Server> servers = registryClient.getServerList(NodeType.WORKER);
+        servers.addAll(registryClient.getServerList(NodeType.MASTER));
+
         for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
             if (Constants.NULL.equals(processInstance.getHost())) {
                 continue;
@@ -136,7 +141,7 @@ public class FailoverService {
             List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance taskInstance : validTaskInstanceList) {
                 LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-                failoverTaskInstance(processInstance, taskInstance, workerServers);
+                failoverTaskInstance(processInstance, taskInstance, servers);
             }
 
             if (serverStartupTime != null && processInstance.getRestartTime() != null
@@ -198,29 +203,37 @@ public class FailoverService {
     /**
      * failover task instance
      * <p>
-     * 1. kill yarn job if there are yarn jobs in tasks.
+     * 1. kill yarn job if run on worker and there are yarn jobs in tasks.
      * 2. change task state from running to need failover.
      * 3. try to notify local master
+     * @param processInstance
+     * @param taskInstance
+     * @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
      */
-    private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> workerServers) {
+    private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
         if (processInstance == null) {
             LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
                 taskInstance.getProcessInstanceId(), taskInstance.getId());
             return;
         }
-        if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
+        if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
             return;
         }
 
+        boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
+
         taskInstance.setProcessInstance(processInstance);
-        TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
-            .buildTaskInstanceRelatedInfo(taskInstance)
-            .buildProcessInstanceRelatedInfo(processInstance)
-            .create();
-
-        if (masterConfig.isKillYarnJobWhenTaskFailover()) {
-            // only kill yarn job if exists , the local thread has exited
-            ProcessUtils.killYarnJob(taskExecutionContext);
+
+        if (!isMasterTask) {
+            TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
+                .buildTaskInstanceRelatedInfo(taskInstance)
+                .buildProcessInstanceRelatedInfo(processInstance)
+                .create();
+
+            if (masterConfig.isKillYarnJobWhenTaskFailover()) {
+                // only kill yarn job if exists , the local thread has exited
+                ProcessUtils.killYarnJob(taskExecutionContext);
+            }
         }
 
         taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
@@ -256,13 +269,13 @@ public class FailoverService {
     }
 
     /**
-     * task needs failover if task start before worker starts
+     * task needs failover if task start before server starts
      *
-     * @param workerServers worker servers
+     * @param servers servers, can container master servers or worker servers
      * @param taskInstance  task instance
      * @return true if task instance need fail over
      */
-    private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, TaskInstance taskInstance) {
+    private boolean checkTaskInstanceNeedFailover(List<Server> servers, TaskInstance taskInstance) {
 
         boolean taskNeedFailover = true;
 
@@ -279,14 +292,13 @@ public class FailoverService {
             return false;
         }
 
-
         //now no host will execute this task instance,so no need to failover the task
         if (taskInstance.getHost() == null) {
             return false;
         }
 
-        //if task start after worker starts, there is no need to failover the task.
-        if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
+        //if task start after server starts, there is no need to failover the task.
+        if (checkTaskAfterServerStart(servers, taskInstance)) {
             taskNeedFailover = false;
         }
 
@@ -296,19 +308,20 @@ public class FailoverService {
     /**
      * check task start after the worker server starts.
      *
+     * @param servers servers, can contain master servers or worker servers
      * @param taskInstance task instance
-     * @return true if task instance start time after worker server start date
+     * @return true if task instance start time after server start date
      */
-    private boolean checkTaskAfterWorkerStart(List<Server> workerServers, TaskInstance taskInstance) {
+    private boolean checkTaskAfterServerStart(List<Server> servers, TaskInstance taskInstance) {
         if (StringUtils.isEmpty(taskInstance.getHost())) {
             return false;
         }
-        Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost());
-        if (workerServerStartDate != null) {
+        Date serverStartDate = getServerStartupTime(servers, taskInstance.getHost());
+        if (serverStartDate != null) {
             if (taskInstance.getStartTime() == null) {
-                return taskInstance.getSubmitTime().after(workerServerStartDate);
+                return taskInstance.getSubmitTime().after(serverStartDate);
             } else {
-                return taskInstance.getStartTime().after(workerServerStartDate);
+                return taskInstance.getStartTime().after(serverStartDate);
             }
         }
         return false;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index 17a3798090..4e3d99347a 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.service;
 
+import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doNothing;
 
@@ -30,9 +33,11 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 
@@ -46,6 +51,7 @@ import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.context.ApplicationContext;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import com.google.common.collect.Lists;
@@ -72,22 +78,34 @@ public class FailoverServiceTest {
     @Mock
     private WorkflowExecuteThreadPool workflowExecuteThreadPool;
 
-    private String testHost;
+    private static int masterPort = 5678;
+    private static int workerPort = 1234;
+
+    private String testMasterHost;
+    private String testWorkerHost;
     private ProcessInstance processInstance;
-    private TaskInstance taskInstance;
+    private TaskInstance masterTaskInstance;
+    private TaskInstance workerTaskInstance;
 
     @Before
     public void before() throws Exception {
-        given(masterConfig.getListenPort()).willReturn(8080);
+        // init spring context
+        ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
+        SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+        springApplicationContext.setApplicationContext(applicationContext);
+
+        given(masterConfig.getListenPort()).willReturn(masterPort);
 
-        testHost = failoverService.getLocalAddress();
-        String ip = testHost.split(":")[0];
-        int port = Integer.valueOf(testHost.split(":")[1]);
-        Assert.assertEquals(8080, port);
+        testMasterHost = failoverService.getLocalAddress();
+        String ip = testMasterHost.split(":")[0];
+        int port = Integer.valueOf(testMasterHost.split(":")[1]);
+        Assert.assertEquals(masterPort, port);
+
+        testWorkerHost = ip + ":" + workerPort;
 
         given(registryClient.getLock(Mockito.anyString())).willReturn(true);
         given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
-        given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost);
+        given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testMasterHost);
         given(registryClient.getStoppable()).willReturn(cause -> {
         });
         given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
@@ -95,30 +113,43 @@ public class FailoverServiceTest {
 
         processInstance = new ProcessInstance();
         processInstance.setId(1);
-        processInstance.setHost(testHost);
+        processInstance.setHost(testMasterHost);
         processInstance.setRestartTime(new Date());
         processInstance.setHistoryCmd("xxx");
         processInstance.setCommandType(CommandType.STOP);
 
-        taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setStartTime(new Date());
-        taskInstance.setHost(testHost);
+        masterTaskInstance = new TaskInstance();
+        masterTaskInstance.setId(1);
+        masterTaskInstance.setStartTime(new Date());
+        masterTaskInstance.setHost(testMasterHost);
+        masterTaskInstance.setTaskType(TASK_TYPE_SWITCH);
+
+        workerTaskInstance = new TaskInstance();
+        workerTaskInstance.setId(2);
+        workerTaskInstance.setStartTime(new Date());
+        workerTaskInstance.setHost(testWorkerHost);
+        workerTaskInstance.setTaskType(COMMON_TASK_TYPE);
 
-        given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance));
-        given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost));
+        given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(masterTaskInstance, workerTaskInstance));
+        given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testMasterHost));
         given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
         doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
-        given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance));
+        given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
         given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
 
         Thread.sleep(1000);
-        Server server = new Server();
-        server.setHost(ip);
-        server.setPort(port);
-        server.setCreateTime(new Date());
-        given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server));
-        given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server));
+        Server masterServer = new Server();
+        masterServer.setHost(ip);
+        masterServer.setPort(masterPort);
+        masterServer.setCreateTime(new Date());
+
+        Server workerServer = new Server();
+        workerServer.setHost(ip);
+        workerServer.setPort(workerPort);
+        workerServer.setCreateTime(new Date());
+
+        given(registryClient.getServerList(NodeType.WORKER)).willReturn(new ArrayList<>(Arrays.asList(workerServer)));
+        given(registryClient.getServerList(NodeType.MASTER)).willReturn(new ArrayList<>(Arrays.asList(masterServer)));
         ReflectionTestUtils.setField(failoverService, "registryClient", registryClient);
 
         doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
@@ -132,26 +163,26 @@ public class FailoverServiceTest {
     @Test
     public void failoverMasterTest() {
         processInstance.setHost(Constants.NULL);
-        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
-        Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
-
-        processInstance.setHost(testHost);
-        taskInstance.setState(ExecutionStatus.SUCCESS);
-        failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
-        Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
+        masterTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER);
+        Assert.assertNotEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
+
+        processInstance.setHost(testMasterHost);
+        masterTaskInstance.setState(ExecutionStatus.SUCCESS);
+        failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER);
+        Assert.assertNotEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
         Assert.assertEquals(Constants.NULL, processInstance.getHost());
 
-        processInstance.setHost(testHost);
-        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
-        failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
-        Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
+        processInstance.setHost(testMasterHost);
+        masterTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        failoverService.failoverServerWhenDown(testMasterHost, NodeType.MASTER);
+        Assert.assertEquals(masterTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
         Assert.assertEquals(Constants.NULL, processInstance.getHost());
     }
 
     @Test
     public void failoverWorkTest() {
-        failoverService.failoverServerWhenDown(testHost, NodeType.WORKER);
-        Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
+        failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER);
+        Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
     }
 }