You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2021/06/02 09:43:02 UTC

[GitHub] [dolphinscheduler] blackberrier commented on a change in pull request #5572: [Improvement-5539][Master] Check status of taskInstance from cache

blackberrier commented on a change in pull request #5572:
URL: https://github.com/apache/dolphinscheduler/pull/5572#discussion_r643788658



##########
File path: dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
##########
@@ -47,6 +58,33 @@
     @Autowired
     private ProcessService processService;
 
+    @PostConstruct
+    public void init() {
+        super.setName("TaskInstanceCacheRefreshThread");
+        super.start();
+    }
+
+    /**
+     * issue#5539 add thread to fetch task state from database in a fixed rate
+     */
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {
+            try {
+                for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
+                    TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
+                    if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
+                        logger.debug("task {} need fault tolerance, update instance cache", taskInstance.getId());
+                        taskInstanceCache.put(taskInstanceEntry.getKey(), taskInstance);

Review comment:
       Hi, @ruanwenjun I don't quite get the situation.
   Assume that in one loop, we are checking the task instance, and at this time, `MasterTaskExecThread` remove it. At this point, 'remove' may mean two level, first is database remove, and we will get a null taskInstance. Second is cache remove, which means taskInstance is finished state. In both level, the 'if' condition is not satisfied.
   Or another situation, one task is removed outside the entryset loop, then the task will not enter loop.
   I wrote a demo for testing , listed below. In timer thread, the sleep simulate a time wasting opration, and another thread remove the entry while timer thread is doing the opration.
   ```
   import java.util.Map;
   import java.util.Timer;
   import java.util.TimerTask;
   import java.util.concurrent.ConcurrentHashMap;
   
   public class ConcurrentHashMapRemoveIteratorTest {
   
       private static ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
   
       public static void main(String[] args) {
           /*for (int i = 0; i < 10; i++) {
               map.put(i + ",", i + "");
           }*/
           map.put(10 + ",", 10 + "");
   
           new Timer().scheduleAtFixedRate(
                   new TimerTask() {
                       @Override
                       public void run() {
                           System.out.println("new loop");
                           for (Map.Entry<String, String> entry : map.entrySet()) {
                               System.out.println("begin" + entry.getKey() + entry.getValue());
                               try {
                                   System.out.println("sleeping");
                                   Thread.sleep(5000);
                               } catch (InterruptedException e) {
                                   e.printStackTrace();
                               }
                               System.out.println("end" + entry.getKey() + entry.getValue());
                           }
                           System.out.println("new loop end");
                           System.out.println();
   
                       }
                   }, 1000, 10000
           );
   
           new Thread(() -> {
               try {
                   Thread.sleep(12000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               map.remove(10 + ",");
               //map.put(10 + ",","?");
               System.out.println("removed!");
   
           }).start();
       }
   }
   
   ```
   
   The output is 
   ```
   new loop
   begin10,10
   sleeping
   end10,10
   new loop end
   
   new loop
   begin10,10
   sleeping
   removed!
   end10,10
   new loop end
   
   new loop
   new loop end
   ```
   
   Maybe there are something wrong in my situation. Can you point out? Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org