You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@dolphinscheduler.apache.org by qiao zhanwei <qi...@outlook.com> on 2019/12/11 09:34:19 UTC

回复: Re: worker deletes queue task, if the db task insert is slow

Have other opinions ?

If not, the master code will be modified to manually guarantee transactions

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

发件人: qiaozhanwei@outlook.com<ma...@outlook.com>
发送时间: 2019-12-11 10:17
收件人: dev<ma...@dolphinscheduler.apache.org>
主题: Re: Re: worker deletes queue task, if the db task insert is slow
agree to ensure data consistency on the master side

―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

From: guo jiwei<ma...@gmail.com>
Date: 2019-12-10 23:40
To: dev<ma...@dolphinscheduler.apache.org>
Subject: Re: worker deletes queue task, if the db task insert is slow
Hi zhanwei,
   what do you mean "another situation is db task insert is slow" .
   Task should save into db and then save to zk queue orderly by
MasterServer. otherwise , it may occur inaccurate task status or data
inconsistency.  and we have to keep more code to keep it right like the
above way


On Tue, Dec 10, 2019 at 2:47 PM qiao zhanwei <qi...@outlook.com>
wrote:

> Currently the communication between master and worker is through zk as a
> queue
>
> 1.master inserts tasks into zk and db as transactions
>
> @Transactional(rollbackFor = Exception.class)
>     public TaskInstance submitTask(TaskInstance taskInstance,
> ProcessInstance processInstance){
>         logger.info("start submit task : {}, instance id:{}, state: {}, ",
>                 taskInstance.getName(), processInstance.getId(),
> processInstance.getState() );
>         processInstance =
> this.findProcessInstanceDetailById(processInstance.getId());
>         //submit to mysql
>         TaskInstance task= submitTaskInstanceToMysql(taskInstance,
> processInstance);
>         if(task.isSubProcess() && !task.getState().typeIsFinished()){
>             ProcessInstanceMap processInstanceMap =
> setProcessInstanceMap(processInstance, task);
>
>             TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(),
> TaskNode.class);
>             Map<String, String> subProcessParam =
> JSONUtils.toMap(taskNode.getParams());
>             Integer defineId =
> Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
>             createSubWorkProcessCommand(processInstance,
> processInstanceMap, defineId, task);
>         }else if(!task.getState().typeIsFinished()){
>             //submit to task queue
>
> task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
>             submitTaskToQueue(task);
>         }
>         logger.info("submit task :{} state:{} complete, instance id:{}
> state: {}  ",
>                 taskInstance.getName(), task.getState(),
> processInstance.getId(), processInstance.getState());
>         return task;
>     }
>
> 2.worker fetch task from zk and query db.if both exist,submit task to
> execute
> another situation is db task insert is slow . then
>
>     /**
>      * wait for task instance exists, because of db action would be
> delayed.
>      *
>      * @throws Exception exception
>      */
>     private void waitForTaskInstance()throws Exception{
>         int retryTimes = 30;
>         while (taskInstance == null && retryTimes > 0) {
>             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
>             taskInstance = processDao.findTaskInstanceById(taskInstId);
>             retryTimes--;
>         }
>     }
>
> The worker waits for the db 30s to insert the task instance. If it exceeds
> 30s, the task corresponding to the zk queue is deleted.
> This will cause the task to be in the task submited state and the process
> instance to be running.
>
>
>
> In this case :
> master to ensure data consistency or other ?
>
> Welcome to discuss
>
> thx
>
> ―――――――――――――
> DolphinScheduler(Incubator)  PPMC
> Zhanwei Qiao 乔占卫
>
> qiaozhanwei@outlook.com
>