You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/06/15 02:30:45 UTC

[dolphinscheduler] 06/12: [Improvement-5539][Master] Check status of taskInstance from cache (#5572)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 053e548bf30aa9af0877364a7b06e7603f1c57d0
Author: blackberrier <li...@163.com>
AuthorDate: Thu Jun 10 09:39:12 2021 +0800

    [Improvement-5539][Master] Check status of taskInstance from cache (#5572)
    
    * improvement:check status of taskInstance from cache
    
    * issue5572 use timer instead of while&sleep; consider concurrent modification
    
    * use computeifpresent instead of lock
    
    * simplify getByTaskInstanceId function
    
    * add ut for TaskInstanceCacheManagerImpl; fix bug in TaskInstanceCacheManagerImpl
    
    * add Apache license header;add test class in root pom
---
 .../apache/dolphinscheduler/common/Constants.java  |   5 +
 .../cache/impl/TaskInstanceCacheManagerImpl.java   |  48 +++++-
 .../server/master/runner/MasterTaskExecThread.java |   3 +-
 .../impl/TaskInstanceCacheManagerImplTest.java     | 177 +++++++++++++++++++++
 pom.xml                                            |   1 +
 5 files changed, 227 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index c366bac..59e65c5 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -531,6 +531,11 @@ public final class Constants {
     public static final int SLEEP_TIME_MILLIS = 1000;
 
     /**
+     * master task instance cache-database refresh interval
+     */
+    public static final int CACHE_REFRESH_TIME_MILLIS = 20 * 1000;
+
+    /**
      * heartbeat for zk info length
      */
     public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 366a6c4..43632ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -14,8 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.master.cache.impl;
 
+import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
+
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -25,8 +28,14 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -47,6 +56,24 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
     @Autowired
     private ProcessService processService;
 
+    /**
+     * taskInstance cache refresh timer
+     */
+    private Timer refreshTaskInstanceTimer = null;
+
+    @PostConstruct
+    public void init() {
+        //issue#5539 add thread to fetch task state from database in a fixed rate
+        this.refreshTaskInstanceTimer = new Timer(true);
+        refreshTaskInstanceTimer.scheduleAtFixedRate(
+                new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS
+        );
+    }
+
+    @PreDestroy
+    public void close() {
+        this.refreshTaskInstanceTimer.cancel();
+    }
 
     /**
      * get taskInstance by taskInstance id
@@ -56,12 +83,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
      */
     @Override
     public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
-        TaskInstance taskInstance = taskInstanceCache.get(taskInstanceId);
-        if (taskInstance == null){
-            taskInstance = processService.findTaskInstanceById(taskInstanceId);
-            taskInstanceCache.put(taskInstanceId,taskInstance);
-        }
-        return taskInstance;
+        return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId));
     }
 
     /**
@@ -106,6 +128,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
         TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
         taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
         taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
+        taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance);
     }
 
     /**
@@ -116,4 +139,17 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
     public void removeByTaskInstanceId(Integer taskInstanceId) {
         taskInstanceCache.remove(taskInstanceId);
     }
+
+    class RefreshTaskInstanceTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
+                TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
+                if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                    taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance);
+                }
+            }
+
+        }
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 4ffcc22..d01ef0f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -143,7 +143,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
                     this.checkTimeoutFlag = !alertTimeout();
                 }
                 // updateProcessInstance task instance
-                taskInstance = processService.findTaskInstanceById(taskInstance.getId());
+                //issue#5539 Check status of taskInstance from cache
+                taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
                 processInstance = processService.findProcessInstanceById(processInstance.getId());
                 Thread.sleep(Constants.SLEEP_TIME_MILLIS);
             } catch (Exception e) {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
new file mode 100644
index 0000000..8dc3f80
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cache.impl;
+
+import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TaskInstanceCacheManagerImplTest {
+
+    @InjectMocks
+    private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
+
+    @Mock(name = "processService")
+    private ProcessService processService;
+
+    @Before
+    public void before() {
+
+        TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
+        taskExecuteAckCommand.setStatus(1);
+        taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
+        taskExecuteAckCommand.setHost("worker007");
+        taskExecuteAckCommand.setLogPath("/temp/worker.log");
+        taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7));
+        taskExecuteAckCommand.setTaskInstanceId(0);
+
+        taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand);
+
+    }
+
+    @Test
+    public void testInit() throws InterruptedException {
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(0);
+        taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+        taskInstance.setExecutePath("/dolphinscheduler/worker");
+        taskInstance.setHost("worker007");
+        taskInstance.setLogPath("/temp/worker.log");
+        taskInstance.setProcessInstanceId(0);
+
+        Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance);
+
+        taskInstanceCacheManager.init();
+        TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000);
+
+        Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState());
+
+    }
+
+    @Test
+    public void getByTaskInstanceIdFromCache() {
+        TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0);
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(0);
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        taskInstance.setExecutePath("/dolphinscheduler/worker");
+        taskInstance.setHost("worker007");
+        taskInstance.setLogPath("/temp/worker.log");
+        taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
+
+        Assert.assertEquals(taskInstance.toString(), instanceGot.toString());
+
+    }
+
+    @Test
+    public void getByTaskInstanceIdFromDatabase() {
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setId(1);
+        taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+        taskInstance.setExecutePath("/dolphinscheduler/worker");
+        taskInstance.setHost("worker007");
+        taskInstance.setLogPath("/temp/worker.log");
+        taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
+
+        Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+
+        TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1);
+
+        Assert.assertEquals(taskInstance, instanceGot);
+
+    }
+
+    @Test
+    public void cacheTaskInstanceByTaskExecutionContext() {
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(2);
+        taskExecutionContext.setTaskName("blackberrier test");
+        taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7));
+        taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
+        taskExecutionContext.setExecutePath("/tmp");
+
+        taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext);
+
+        TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2);
+
+        Assert.assertEquals(taskInstance.getId(), 2);
+        Assert.assertEquals(taskInstance.getName(), "blackberrier test");
+        Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7));
+        Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc());
+        Assert.assertEquals(taskInstance.getExecutePath(), "/tmp");
+
+    }
+
+    @Test
+    public void testCacheTaskInstanceByTaskExecuteAckCommand() {
+        TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
+
+        Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState());
+        Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime());
+        Assert.assertEquals("worker007", taskInstance.getHost());
+        Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath());
+        Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath());
+
+    }
+
+    @Test
+    public void testCacheTaskInstanceByTaskExecuteResponseCommand() {
+        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
+        responseCommand.setTaskInstanceId(0);
+        responseCommand.setStatus(9);
+        responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8));
+
+        taskInstanceCacheManager.cacheTaskInstance(responseCommand);
+
+        TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
+
+        Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime());
+        Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState());
+
+    }
+
+    @Test
+    public void removeByTaskInstanceId() {
+        taskInstanceCacheManager.removeByTaskInstanceId(0);
+        Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0));
+
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 2003b8c..4f9fa3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -966,6 +966,7 @@
                         <!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->
                         <include>**/server/log/TaskLogFilterTest.java</include>
                         <include>**/server/log/WorkerLogFilterTest.java</include>
+                        <include>**/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java</include>
                         <include>**/server/master/config/MasterConfigTest.java</include>
                         <include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
                         <include>**/server/master/runner/MasterTaskExecThreadTest.java</include>