You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/06/02 02:57:42 UTC

[GitHub] [dolphinscheduler] ruanwenjun commented on a change in pull request #5572: [Improvement-5539][Master] Check status of taskInstance from cache

ruanwenjun commented on a change in pull request #5572:
URL: https://github.com/apache/dolphinscheduler/pull/5572#discussion_r643614027



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
##########
@@ -47,6 +58,33 @@
     @Autowired
     private ProcessService processService;
 
+    @PostConstruct
+    public void init() {
+        super.setName("TaskInstanceCacheRefreshThread");
+        super.start();
+    }
+
+    /**
+     * issue#5539 add thread to fetch task state from database in a fixed rate
+     */
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {

Review comment:
       You should better not implement like this, if you want to close, you can only change the Stop, but Stop is used in many other places.

##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
##########
@@ -47,6 +58,33 @@
     @Autowired
     private ProcessService processService;
 
+    @PostConstruct
+    public void init() {
+        super.setName("TaskInstanceCacheRefreshThread");
+        super.start();
+    }
+
+    /**
+     * issue#5539 add thread to fetch task state from database in a fixed rate
+     */
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {
+            try {
+                for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
+                    TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
+                    if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                        logger.debug("task {} need fault tolerance, update instance cache", taskInstance.getId());
+                        taskInstanceCache.put(taskInstanceEntry.getKey(), taskInstance);
+                    }
+                }
+                Thread.sleep(CACHE_REFRESH_TIME_MILLIS);

Review comment:
       You can use Timer to implement, rather than while(true) sleep

##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
##########
@@ -149,7 +143,10 @@ public Boolean waitTaskQuit() {
                     this.checkTimeoutFlag = !alertTimeout();
                 }
                 // updateProcessInstance task instance
-                taskInstance = processService.findTaskInstanceById(taskInstance.getId());
+                //taskInstance = processService.findTaskInstanceById(taskInstance.getId());

Review comment:
       Remove the unused code.

##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
##########
@@ -47,6 +58,33 @@
     @Autowired
     private ProcessService processService;
 
+    @PostConstruct
+    public void init() {
+        super.setName("TaskInstanceCacheRefreshThread");
+        super.start();
+    }
+
+    /**
+     * issue#5539 add thread to fetch task state from database in a fixed rate
+     */
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {
+            try {
+                for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
+                    TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
+                    if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                        logger.debug("task {} need fault tolerance, update instance cache", taskInstance.getId());
+                        taskInstanceCache.put(taskInstanceEntry.getKey(), taskInstance);

Review comment:
       You need to consider the concurrency modification. For example, if there is a thread in `MasterTaskExecThread` remove the instance at the same time, these may cause a memory leak. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org