You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/06/15 02:30:45 UTC
[dolphinscheduler] 06/12: [Improvement-5539][Master] Check status
of taskInstance from cache (#5572)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 053e548bf30aa9af0877364a7b06e7603f1c57d0
Author: blackberrier <li...@163.com>
AuthorDate: Thu Jun 10 09:39:12 2021 +0800
[Improvement-5539][Master] Check status of taskInstance from cache (#5572)
* improvement:check status of taskInstance from cache
* issue5572 use timer instead of while&sleep; consider concurrent modification
* use computeifpresent instead of lock
* simplify getByTaskInstanceId function
* add ut for TaskInstanceCacheManagerImpl; fix bug in TaskInstanceCacheManagerImpl
* add Apache license header;add test class in root pom
---
.../apache/dolphinscheduler/common/Constants.java | 5 +
.../cache/impl/TaskInstanceCacheManagerImpl.java | 48 +++++-
.../server/master/runner/MasterTaskExecThread.java | 3 +-
.../impl/TaskInstanceCacheManagerImplTest.java | 177 +++++++++++++++++++++
pom.xml | 1 +
5 files changed, 227 insertions(+), 7 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index c366bac..59e65c5 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -531,6 +531,11 @@ public final class Constants {
public static final int SLEEP_TIME_MILLIS = 1000;
/**
+ * master task instance cache-database refresh interval
+ */
+ public static final int CACHE_REFRESH_TIME_MILLIS = 20 * 1000;
+
+ /**
* heartbeat for zk info length
*/
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 366a6c4..43632ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -14,8 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master.cache.impl;
+import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
+
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -25,8 +28,14 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -47,6 +56,24 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
@Autowired
private ProcessService processService;
+ /**
+ * taskInstance cache refresh timer
+ */
+ private Timer refreshTaskInstanceTimer = null;
+
+ @PostConstruct
+ public void init() {
+ //issue#5539 add thread to fetch task state from database in a fixed rate
+ this.refreshTaskInstanceTimer = new Timer(true);
+ refreshTaskInstanceTimer.scheduleAtFixedRate(
+ new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS
+ );
+ }
+
+ @PreDestroy
+ public void close() {
+ this.refreshTaskInstanceTimer.cancel();
+ }
/**
* get taskInstance by taskInstance id
@@ -56,12 +83,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
*/
@Override
public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
- TaskInstance taskInstance = taskInstanceCache.get(taskInstanceId);
- if (taskInstance == null){
- taskInstance = processService.findTaskInstanceById(taskInstanceId);
- taskInstanceCache.put(taskInstanceId,taskInstance);
- }
- return taskInstance;
+ return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId));
}
/**
@@ -106,6 +128,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
+ taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance);
}
/**
@@ -116,4 +139,17 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
public void removeByTaskInstanceId(Integer taskInstanceId) {
taskInstanceCache.remove(taskInstanceId);
}
+
+ class RefreshTaskInstanceTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
+ if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+ taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance);
+ }
+ }
+
+ }
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index 4ffcc22..d01ef0f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -143,7 +143,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
this.checkTimeoutFlag = !alertTimeout();
}
// updateProcessInstance task instance
- taskInstance = processService.findTaskInstanceById(taskInstance.getId());
+ //issue#5539 Check status of taskInstance from cache
+ taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
new file mode 100644
index 0000000..8dc3f80
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cache.impl;
+
+import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TaskInstanceCacheManagerImplTest {
+
+ @InjectMocks
+ private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
+
+ @Mock(name = "processService")
+ private ProcessService processService;
+
+ @Before
+ public void before() {
+
+ TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
+ taskExecuteAckCommand.setStatus(1);
+ taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
+ taskExecuteAckCommand.setHost("worker007");
+ taskExecuteAckCommand.setLogPath("/temp/worker.log");
+ taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7));
+ taskExecuteAckCommand.setTaskInstanceId(0);
+
+ taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand);
+
+ }
+
+ @Test
+ public void testInit() throws InterruptedException {
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(0);
+ taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+ taskInstance.setExecutePath("/dolphinscheduler/worker");
+ taskInstance.setHost("worker007");
+ taskInstance.setLogPath("/temp/worker.log");
+ taskInstance.setProcessInstanceId(0);
+
+ Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance);
+
+ taskInstanceCacheManager.init();
+ TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000);
+
+ Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState());
+
+ }
+
+ @Test
+ public void getByTaskInstanceIdFromCache() {
+ TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0);
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(0);
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ taskInstance.setExecutePath("/dolphinscheduler/worker");
+ taskInstance.setHost("worker007");
+ taskInstance.setLogPath("/temp/worker.log");
+ taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
+
+ Assert.assertEquals(taskInstance.toString(), instanceGot.toString());
+
+ }
+
+ @Test
+ public void getByTaskInstanceIdFromDatabase() {
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
+ taskInstance.setExecutePath("/dolphinscheduler/worker");
+ taskInstance.setHost("worker007");
+ taskInstance.setLogPath("/temp/worker.log");
+ taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
+
+ Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+
+ TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1);
+
+ Assert.assertEquals(taskInstance, instanceGot);
+
+ }
+
+ @Test
+ public void cacheTaskInstanceByTaskExecutionContext() {
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setTaskInstanceId(2);
+ taskExecutionContext.setTaskName("blackberrier test");
+ taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7));
+ taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
+ taskExecutionContext.setExecutePath("/tmp");
+
+ taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext);
+
+ TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2);
+
+ Assert.assertEquals(taskInstance.getId(), 2);
+ Assert.assertEquals(taskInstance.getName(), "blackberrier test");
+ Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7));
+ Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc());
+ Assert.assertEquals(taskInstance.getExecutePath(), "/tmp");
+
+ }
+
+ @Test
+ public void testCacheTaskInstanceByTaskExecuteAckCommand() {
+ TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
+
+ Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState());
+ Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime());
+ Assert.assertEquals("worker007", taskInstance.getHost());
+ Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath());
+ Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath());
+
+ }
+
+ @Test
+ public void testCacheTaskInstanceByTaskExecuteResponseCommand() {
+ TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
+ responseCommand.setTaskInstanceId(0);
+ responseCommand.setStatus(9);
+ responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8));
+
+ taskInstanceCacheManager.cacheTaskInstance(responseCommand);
+
+ TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
+
+ Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime());
+ Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState());
+
+ }
+
+ @Test
+ public void removeByTaskInstanceId() {
+ taskInstanceCacheManager.removeByTaskInstanceId(0);
+ Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0));
+
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 2003b8c..4f9fa3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -966,6 +966,7 @@
<!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->
<include>**/server/log/TaskLogFilterTest.java</include>
<include>**/server/log/WorkerLogFilterTest.java</include>
+ <include>**/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java</include>
<include>**/server/master/config/MasterConfigTest.java</include>
<include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
<include>**/server/master/runner/MasterTaskExecThreadTest.java</include>