You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@dolphinscheduler.apache.org by qiao zhanwei <qi...@outlook.com> on 2019/12/10 06:46:56 UTC

worker deletes queue task, if the db task insert is slow

Currently the communication between master and worker is through zk as a queue

1.master inserts tasks into zk and db as transactions

@Transactional(rollbackFor = Exception.class)
    public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
        logger.info("start submit task : {}, instance id:{}, state: {}, ",
                taskInstance.getName(), processInstance.getId(), processInstance.getState() );
        processInstance = this.findProcessInstanceDetailById(processInstance.getId());
        //submit to mysql
        TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
        if(task.isSubProcess() && !task.getState().typeIsFinished()){
            ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);

            TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
            Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
            Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
            createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
        }else if(!task.getState().typeIsFinished()){
            //submit to task queue
            task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
            submitTaskToQueue(task);
        }
        logger.info("submit task :{} state:{} complete, instance id:{} state: {}  ",
                taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
        return task;
    }

2.worker fetch task from zk and query db.if both exist,submit task to execute
another situation is db task insert is slow . then

    /**
     * wait for task instance exists, because of db action would be delayed.
     *
     * @throws Exception exception
     */
    private void waitForTaskInstance()throws Exception{
        int retryTimes = 30;
        while (taskInstance == null && retryTimes > 0) {
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            taskInstance = processDao.findTaskInstanceById(taskInstId);
            retryTimes--;
        }
    }

The worker waits for the db 30s to insert the task instance. If it exceeds 30s, the task corresponding to the zk queue is deleted.
This will cause the task to be in the task submited state and the process instance to be running.



In this case :
master to ensure data consistency or other ?

Welcome to discuss

thx

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

Re: Re: worker deletes queue task, if the db task insert is slow

Posted by qiao zhanwei <qi...@outlook.com>.
agree to ensure data consistency on the master side

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

From: guo jiwei<ma...@gmail.com>
Date: 2019-12-10 23:40
To: dev<ma...@dolphinscheduler.apache.org>
Subject: Re: worker deletes queue task, if the db task insert is slow
Hi zhanwei,
   what do you mean "another situation is db task insert is slow" .
   Task should save into db and then save to zk queue orderly by
MasterServer. otherwise , it may occur inaccurate task status or data
inconsistency.  and we have to keep more code to keep it right like the
above way


On Tue, Dec 10, 2019 at 2:47 PM qiao zhanwei <qi...@outlook.com>
wrote:

> Currently the communication between master and worker is through zk as a
> queue
>
> 1.master inserts tasks into zk and db as transactions
>
> @Transactional(rollbackFor = Exception.class)
>     public TaskInstance submitTask(TaskInstance taskInstance,
> ProcessInstance processInstance){
>         logger.info("start submit task : {}, instance id:{}, state: {}, ",
>                 taskInstance.getName(), processInstance.getId(),
> processInstance.getState() );
>         processInstance =
> this.findProcessInstanceDetailById(processInstance.getId());
>         //submit to mysql
>         TaskInstance task= submitTaskInstanceToMysql(taskInstance,
> processInstance);
>         if(task.isSubProcess() && !task.getState().typeIsFinished()){
>             ProcessInstanceMap processInstanceMap =
> setProcessInstanceMap(processInstance, task);
>
>             TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
> TaskNode.class);
>             Map<String, String> subProcessParam =
> JSONUtils.toMap(taskNode.getParams());
>             Integer defineId =
> Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
>             createSubWorkProcessCommand(processInstance,
> processInstanceMap, defineId, task);
>         }else if(!task.getState().typeIsFinished()){
>             //submit to task queue
>
> task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
>             submitTaskToQueue(task);
>         }
>         logger.info("submit task :{} state:{} complete, instance id:{}
> state: {}  ",
>                 taskInstance.getName(), task.getState(),
> processInstance.getId(), processInstance.getState());
>         return task;
>     }
>
> 2.worker fetch task from zk and query db.if both exist,submit task to
> execute
> another situation is db task insert is slow . then
>
>     /**
>      * wait for task instance exists, because of db action would be
> delayed.
>      *
>      * @throws Exception exception
>      */
>     private void waitForTaskInstance()throws Exception{
>         int retryTimes = 30;
>         while (taskInstance == null && retryTimes > 0) {
>             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
>             taskInstance = processDao.findTaskInstanceById(taskInstId);
>             retryTimes--;
>         }
>     }
>
> The worker waits for the db 30s to insert the task instance. If it exceeds
> 30s, the task corresponding to the zk queue is deleted.
> This will cause the task to be in the task submited state and the process
> instance to be running.
>
>
>
> In this case :
> master to ensure data consistency or other ?
>
> Welcome to discuss
>
> thx
>
> ―――――――――――――
> DolphinScheduler(Incubator)  PPMC
> Zhanwei Qiao 乔占卫
>
> qiaozhanwei@outlook.com
>

回复: Re: worker deletes queue task, if the db task insert is slow

Posted by qiao zhanwei <qi...@outlook.com>.
Have other opinions ?

If not, the master code will be modified to manually guarantee transactions

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

发件人: qiaozhanwei@outlook.com<ma...@outlook.com>
发送时间: 2019-12-11 10:17
收件人: dev<ma...@dolphinscheduler.apache.org>
主题: Re: Re: worker deletes queue task, if the db task insert is slow
agree to ensure data consistency on the master side

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

From: guo jiwei<ma...@gmail.com>
Date: 2019-12-10 23:40
To: dev<ma...@dolphinscheduler.apache.org>
Subject: Re: worker deletes queue task, if the db task insert is slow
Hi zhanwei,
   what do you mean "another situation is db task insert is slow" .
   Task should save into db and then save to zk queue orderly by
MasterServer. otherwise , it may occur inaccurate task status or data
inconsistency.  and we have to keep more code to keep it right like the
above way


On Tue, Dec 10, 2019 at 2:47 PM qiao zhanwei <qi...@outlook.com>
wrote:

> Currently the communication between master and worker is through zk as a
> queue
>
> 1.master inserts tasks into zk and db as transactions
>
> @Transactional(rollbackFor = Exception.class)
>     public TaskInstance submitTask(TaskInstance taskInstance,
> ProcessInstance processInstance){
>         logger.info("start submit task : {}, instance id:{}, state: {}, ",
>                 taskInstance.getName(), processInstance.getId(),
> processInstance.getState() );
>         processInstance =
> this.findProcessInstanceDetailById(processInstance.getId());
>         //submit to mysql
>         TaskInstance task= submitTaskInstanceToMysql(taskInstance,
> processInstance);
>         if(task.isSubProcess() && !task.getState().typeIsFinished()){
>             ProcessInstanceMap processInstanceMap =
> setProcessInstanceMap(processInstance, task);
>
>             TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
> TaskNode.class);
>             Map<String, String> subProcessParam =
> JSONUtils.toMap(taskNode.getParams());
>             Integer defineId =
> Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
>             createSubWorkProcessCommand(processInstance,
> processInstanceMap, defineId, task);
>         }else if(!task.getState().typeIsFinished()){
>             //submit to task queue
>
> task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
>             submitTaskToQueue(task);
>         }
>         logger.info("submit task :{} state:{} complete, instance id:{}
> state: {}  ",
>                 taskInstance.getName(), task.getState(),
> processInstance.getId(), processInstance.getState());
>         return task;
>     }
>
> 2.worker fetch task from zk and query db.if both exist,submit task to
> execute
> another situation is db task insert is slow . then
>
>     /**
>      * wait for task instance exists, because of db action would be
> delayed.
>      *
>      * @throws Exception exception
>      */
>     private void waitForTaskInstance()throws Exception{
>         int retryTimes = 30;
>         while (taskInstance == null && retryTimes > 0) {
>             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
>             taskInstance = processDao.findTaskInstanceById(taskInstId);
>             retryTimes--;
>         }
>     }
>
> The worker waits for the db 30s to insert the task instance. If it exceeds
> 30s, the task corresponding to the zk queue is deleted.
> This will cause the task to be in the task submited state and the process
> instance to be running.
>
>
>
> In this case :
> master to ensure data consistency or other ?
>
> Welcome to discuss
>
> thx
>
> ―――――――――――――
> DolphinScheduler(Incubator)  PPMC
> Zhanwei Qiao 乔占卫
>
> qiaozhanwei@outlook.com
>

Re: worker deletes queue task, if the db task insert is slow

Posted by guo jiwei <te...@gmail.com>.
Hi zhanwei,
   what do you mean "another situation is db task insert is slow" .
   Task should save into db and then save to zk queue orderly by
MasterServer. otherwise , it may occur inaccurate task status or data
inconsistency.  and we have to keep more code to keep it right like the
above way


On Tue, Dec 10, 2019 at 2:47 PM qiao zhanwei <qi...@outlook.com>
wrote:

> Currently the communication between master and worker is through zk as a
> queue
>
> 1.master inserts tasks into zk and db as transactions
>
> @Transactional(rollbackFor = Exception.class)
>     public TaskInstance submitTask(TaskInstance taskInstance,
> ProcessInstance processInstance){
>         logger.info("start submit task : {}, instance id:{}, state: {}, ",
>                 taskInstance.getName(), processInstance.getId(),
> processInstance.getState() );
>         processInstance =
> this.findProcessInstanceDetailById(processInstance.getId());
>         //submit to mysql
>         TaskInstance task= submitTaskInstanceToMysql(taskInstance,
> processInstance);
>         if(task.isSubProcess() && !task.getState().typeIsFinished()){
>             ProcessInstanceMap processInstanceMap =
> setProcessInstanceMap(processInstance, task);
>
>             TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
> TaskNode.class);
>             Map<String, String> subProcessParam =
> JSONUtils.toMap(taskNode.getParams());
>             Integer defineId =
> Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
>             createSubWorkProcessCommand(processInstance,
> processInstanceMap, defineId, task);
>         }else if(!task.getState().typeIsFinished()){
>             //submit to task queue
>
> task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
>             submitTaskToQueue(task);
>         }
>         logger.info("submit task :{} state:{} complete, instance id:{}
> state: {}  ",
>                 taskInstance.getName(), task.getState(),
> processInstance.getId(), processInstance.getState());
>         return task;
>     }
>
> 2.worker fetch task from zk and query db.if both exist,submit task to
> execute
> another situation is db task insert is slow . then
>
>     /**
>      * wait for task instance exists, because of db action would be
> delayed.
>      *
>      * @throws Exception exception
>      */
>     private void waitForTaskInstance()throws Exception{
>         int retryTimes = 30;
>         while (taskInstance == null && retryTimes > 0) {
>             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
>             taskInstance = processDao.findTaskInstanceById(taskInstId);
>             retryTimes--;
>         }
>     }
>
> The worker waits for the db 30s to insert the task instance. If it exceeds
> 30s, the task corresponding to the zk queue is deleted.
> This will cause the task to be in the task submited state and the process
> instance to be running.
>
>
>
> In this case :
> master to ensure data consistency or other ?
>
> Welcome to discuss
>
> thx
>
> —————————————
> DolphinScheduler(Incubator)  PPMC
> Zhanwei Qiao 乔占卫
>
> qiaozhanwei@outlook.com
>