You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@dolphinscheduler.apache.org by guo jiwei <te...@gmail.com> on 2019/12/09 11:19:35 UTC

A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Hello everyone ,

    I would like to share some ideas about refactoring
WorkerServer/MasterServer for dolphin-scheduler.

Background
   For current implement of dolphin-scheduler, task info are stored in
zookeeper , and worker-server is using zookeeper lock to keep executing
task continuously. Each worker will try to acquire lock , and if it gets
the lock, it has the ability to execute the task (fetch task from zk, get
task info from db, and etc), or it has to wait for the lock . This is not a
nice way,  for performance, dependance or parallelism.

Proposal
    I suggest worker-server execute task in a way like listening tcp port,
and receive task command via rpc request instead.  In this way,
worker-server is not using lock or connecting db anymore. And it can
execute task concurrently.

General Implementation
   1.  Refactor worker-server as a tcp server listening some port using
Netty for tcp communication. And we define our own binary protocol for
scheduling.
   2.  After starting worker-server, register itself in zookeeper ,
ephemeral node like
/dolphinscheduler/nodes/worker/test/xxx.xxx.xxx.xxx:9800.
 {/dolphinscheduler/nodes/worker/$workerGroup/ip:port}
   3.  MasterServer has take the responsibility for trigger the task, and
choose worker-server to execute it 。
     - first, we have to listen for worker nodes in zookeeper. And cache
the worker list in memory .
    -  second,  when  MasterScheduleThread acquire the lock to execute
command(t_ds_command) ,  it will extract the task info process instance,
then establish a tcp connection to target available worker using task info
, and send the command to worker.
   4. When worker-server receives a task command , it will deserialize the
command into a Task, and execute in thread pool or subprocess.
   5. After worker-server executes the task , it has to report the result
using the pre-connected socket to MasterServer。but if the socket is closed
in any way, WorkerServer has to connect to any other MasterServer to report
the result.


More detail for MasterServer/WorkerServer can be discuss in later mail

Simple graph

[image: image.png]




Tboy
technoboy@apache.org
<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Tboy&uid=technotboy%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22technoboy%40apache.org%22%5D>

Re: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Posted by JUN GAO <ga...@gmail.com>.
Worker-server is not using lock or connecting db anymore. And it can
execute task concurrently.  it`s a good idea.
About fault tolerance I have some ideas:

1、worker node done:
Here are some key processes:
a、In the task instance table , master can get the task`s running
workernode.
b、worker node report tasks status to master node.
c、After the task is submitted to the worker node, the master node checks
whether the task has reached the timeout interval at regular intervals. If
the task has timed out, the master will try to ping the worker node which
is running the task. If ping fails, the master node considers the worker
node is done, and then the master will assign the task to other worker
nodes and modify the worker node in the task instance table. If at this
time a task status report is received from the worker node that the master
thinks has been done, the master  needs to give up This time report.

2、Master node done:
a、Worker node can cache every task`s scheduler node(master node), we can
call it task_master_map. When task status is change ,worker node can get
the task`s master node from the task_master_map , and report the status to
the master node.
b、Because master node schedule the dag, so if a master is down , Other
master nodes can contend for a lock to decide who will take over the dag
running on the done master. Once one of the masters has taken over the dag,
it needs to read the workflow dag from the db and find the running tasks.
from  the task instance table, we can find out which worker nodes are
running these tasks, and then the master node can Use tcp / rpc to notify
those worker nodes to update task_master_map.

This is my idea


李 岗 <lg...@outlook.com> 于2019年12月11日周三 下午2:38写道:

> > In this way, worker-server is not using lock or connecting db anymore.
> And it can execute task concurrently.
> It's good.I agree with it.
>
> In a real operating environment, often appeard the network is bad, or
> there is an exception in the master and worker.
> I'm more interested in fault tolerance,I also hope to discuss more about
> fault tolerance.
>
> For example, talk about fault tolerance of the worker
> 1、The master task has been successfully pushed and task is running in
> worker,but worker is down.
> If this happened,How this task was taken over by other workers?
> If the master cached the workerlist in memory,then reday to resend the
> task to other worker,this master down at this time.
> Can  other master get the workerlist of this master and only one master
> resend the task to one of other worker?
>
> 2、If after the task execution is completed, the worker down without
> reporting the information to the master.
> If this happened,How to update the status of this task?Now if this
> happened,Master will choose other worker to execute this task.
> But I think if this is a long connetion task,the above method also need to
> improve.
>
> ________________________________
> DolphinScheduler(Incubator) PPMC
> Gang Li 李岗
>
> lgcareer2019@outlook.com<ma...@outlook.com>
>
> From: guo jiwei<ma...@gmail.com>
> Date: 2019-12-09 19:19
> To: dev<ma...@dolphinscheduler.apache.org>
> Subject: A proposal for DolphinScheduler- refactor
> WorkerServer/MasterServer
>
> Hello everyone ,
>
>     I would like to share some ideas about refactoring
> WorkerServer/MasterServer for dolphin-scheduler.
>
> Background
>    For current implement of dolphin-scheduler, task info are stored in
> zookeeper , and worker-server is using zookeeper lock to keep executing
> task continuously. Each worker will try to acquire lock , and if it gets
> the lock, it has the ability to execute the task (fetch task from zk, get
> task info from db, and etc), or it has to wait for the lock . This is not a
> nice way,  for performance, dependance or parallelism.
>
> Proposal
>     I suggest worker-server execute task in a way like listening tcp port,
> and receive task command via rpc request instead.  In this way,
> worker-server is not using lock or connecting db anymore. And it can
> execute task concurrently.
>
> General Implementation
>    1.  Refactor worker-server as a tcp server listening some port using
> Netty for tcp communication. And we define our own binary protocol for
> scheduling.
>    2.  After starting worker-server, register itself in zookeeper ,
> ephemeral node like
> /dolphinscheduler/nodes/worker/test/xxx.xxx.xxx.xxx:9800.
> {/dolphinscheduler/nodes/worker/$workerGroup/ip:port}
>    3.  MasterServer has take the responsibility for trigger the task, and
> choose worker-server to execute it 。
>      - first, we have to listen for worker nodes in zookeeper. And cache
> the worker list in memory .
>     -  second,  when  MasterScheduleThread acquire the lock to execute
> command(t_ds_command) ,  it will extract the task info process instance,
> then establish a tcp connection to target available worker using task info
> , and send the command to worker.
>    4. When worker-server receives a task command , it will deserialize the
> command into a Task, and execute in thread pool or subprocess.
>    5. After worker-server executes the task , it has to report the result
> using the pre-connected socket to MasterServer。but if the socket is closed
> in any way, WorkerServer has to connect to any other MasterServer to report
> the result.
>
>
> More detail for MasterServer/WorkerServer can be discuss in later mail
>
> Simple graph
>    [X]
> [image.png]
>
>
>
>
> <
> https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Tboy&uid=technotboy%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22technoboy%40apache.org%22%5D
> >
> [https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
> Tboy
> technoboy@apache.org
>

Re: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Posted by 李 岗 <lg...@outlook.com>.
> In this way, worker-server is not using lock or connecting db anymore. And it can execute task concurrently.
It's good.I agree with it.

In a real operating environment, often appeard the network is bad, or there is an exception in the master and worker.
I'm more interested in fault tolerance,I also hope to discuss more about fault tolerance.

For example, talk about fault tolerance of the worker
1、The master task has been successfully pushed and task is running in worker,but worker is down.
If this happened,How this task was taken over by other workers?
If the master cached the workerlist in memory,then reday to resend the task to other worker,this master down at this time.
Can  other master get the workerlist of this master and only one master resend the task to one of other worker?

2、If after the task execution is completed, the worker down without reporting the information to the master.
If this happened,How to update the status of this task?Now if this happened,Master will choose other worker to execute this task.
But I think if this is a long connetion task,the above method also need to improve.

________________________________
DolphinScheduler(Incubator) PPMC
Gang Li 李岗

lgcareer2019@outlook.com<ma...@outlook.com>

From: guo jiwei<ma...@gmail.com>
Date: 2019-12-09 19:19
To: dev<ma...@dolphinscheduler.apache.org>
Subject: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Hello everyone ,

    I would like to share some ideas about refactoring WorkerServer/MasterServer for dolphin-scheduler.

Background
   For current implement of dolphin-scheduler, task info are stored in zookeeper , and worker-server is using zookeeper lock to keep executing task continuously. Each worker will try to acquire lock , and if it gets the lock, it has the ability to execute the task (fetch task from zk, get task info from db, and etc), or it has to wait for the lock . This is not a nice way,  for performance, dependance or parallelism.

Proposal
    I suggest worker-server execute task in a way like listening tcp port, and receive task command via rpc request instead.  In this way, worker-server is not using lock or connecting db anymore. And it can execute task concurrently.

General Implementation
   1.  Refactor worker-server as a tcp server listening some port using Netty for tcp communication. And we define our own binary protocol for scheduling.
   2.  After starting worker-server, register itself in zookeeper , ephemeral node like /dolphinscheduler/nodes/worker/test/xxx.xxx.xxx.xxx:9800.  {/dolphinscheduler/nodes/worker/$workerGroup/ip:port}
   3.  MasterServer has take the responsibility for trigger the task, and choose worker-server to execute it 。
     - first, we have to listen for worker nodes in zookeeper. And cache the worker list in memory .
    -  second,  when  MasterScheduleThread acquire the lock to execute command(t_ds_command) ,  it will extract the task info process instance, then establish a tcp connection to target available worker using task info , and send the command to worker.
   4. When worker-server receives a task command , it will deserialize the command into a Task, and execute in thread pool or subprocess.
   5. After worker-server executes the task , it has to report the result using the pre-connected socket to MasterServer。but if the socket is closed in any way, WorkerServer has to connect to any other MasterServer to report the result.


More detail for MasterServer/WorkerServer can be discuss in later mail

Simple graph
   [X]
[image.png]




<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Tboy&uid=technotboy%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22technoboy%40apache.org%22%5D>
[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
Tboy
technoboy@apache.org

Re: Re: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Posted by JUN GAO <ga...@gmail.com>.
because master schedule the dag, so if a master is down , the dag need to
recover at another master node . Then the master need get the dag from db
and find the task which is running . This tasks is running on some worker
nodes , if tasks status is change ,the worker need know which master node
is the task's schedule node.


qiao zhanwei <qi...@outlook.com>于2019年12月9日 周一22:22写道:

> very good !
>
> Can be achieved in stages ?
>
> first of one master and worker can communication.
>
> main point :
>     1,master send task to worer by rpc(task_queue,kill_queue)
>     2,worker receive task to execute
>     3,master monitors the worker's execution task results(including
> normal task execution results and kill task execution results)
>
>     new features need to consider :
>     1,master need to election worker to execute task instand of zk preempt
> lock,so need election algorithm
>     2,ifthe worker task is completed, but the master is down。at present,
> other masters will be fault tolerant and take over process instances,
>     but workers need to be aware of the existence of other masters。but
> the current architecture is that workers do not sense the existence of the
> master
>
>     thx
>
> —————————————
> DolphinScheduler(Incubator)  PPMC
> Zhanwei Qiao 乔占卫
>
> qiaozhanwei@outlook.com
>
>
> *From:* guo jiwei <te...@gmail.com>
> *Date:* 2019-12-09 19:38
> *To:* dev <de...@dolphinscheduler.apache.org>
> *Subject:* Re: A proposal for DolphinScheduler- refactor
> WorkerServer/MasterServer
>  Sorry for not attach img
>
> On Mon, Dec 9, 2019 at 7:19 PM guo jiwei <te...@gmail.com> wrote:
>
>>
>> Hello everyone ,
>>
>>     I would like to share some ideas about refactoring
>> WorkerServer/MasterServer for dolphin-scheduler.
>>
>> Background
>>    For current implement of dolphin-scheduler, task info are stored in
>> zookeeper , and worker-server is using zookeeper lock to keep executing
>> task continuously. Each worker will try to acquire lock , and if it gets
>> the lock, it has the ability to execute the task (fetch task from zk, get
>> task info from db, and etc), or it has to wait for the lock . This is not a
>> nice way,  for performance, dependance or parallelism.
>>
>> Proposal
>>     I suggest worker-server execute task in a way like listening tcp
>> port, and receive task command via rpc request instead.  In this way,
>> worker-server is not using lock or connecting db anymore. And it can
>> execute task concurrently.
>>
>> General Implementation
>>    1.  Refactor worker-server as a tcp server listening some port using
>> Netty for tcp communication. And we define our own binary protocol for
>> scheduling.
>>    2.  After starting worker-server, register itself in zookeeper ,
>> ephemeral node like
>> /dolphinscheduler/nodes/worker/test/xxx.xxx.xxx.xxx:9800.
>>  {/dolphinscheduler/nodes/worker/$workerGroup/ip:port}
>>    3.  MasterServer has take the responsibility for trigger the task, and
>> choose worker-server to execute it 。
>>      - first, we have to listen for worker nodes in zookeeper. And cache
>> the worker list in memory .
>>     -  second,  when  MasterScheduleThread acquire the lock to execute
>> command(t_ds_command) ,  it will extract the task info process instance,
>> then establish a tcp connection to target available worker using task info
>> , and send the command to worker.
>>    4. When worker-server receives a task command , it will deserialize
>> the command into a Task, and execute in thread pool or subprocess.
>>    5. After worker-server executes the task , it has to report the result
>> using the pre-connected socket to MasterServer。but if the socket is closed
>> in any way, WorkerServer has to connect to any other MasterServer to report
>> the result.
>>
>>
>> More detail for MasterServer/WorkerServer can be discuss in later mail
>>
>> Simple graph
>>
>> [image: image.png]
>>
>>
>>
>>
>> Tboy
>> technoboy@apache.org
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Tboy&uid=technotboy%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22technoboy%40apache.org%22%5D>
>>
>

Re: Re: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Posted by qiao zhanwei <qi...@outlook.com>.
very good !

Can be achieved in stages ?

first of one master and worker can communication.

main point :
    1,master send task to worer by rpc(task_queue,kill_queue)
    2,worker receive task to execute
    3,master monitors the worker's execution task results(including normal task execution results and kill task execution results)

    new features need to consider :
    1,master need to election worker to execute task instand of zk preempt lock,so need election algorithm
    2,ifthe worker task is completed, but the master is down。at present, other masters will be fault tolerant and take over process instances,
    but workers need to be aware of the existence of other masters。but the current architecture is that workers do not sense the existence of the master

    thx
―――――――――――――
DolphinScheduler(Incubator)  PPMC
Zhanwei Qiao 乔占卫

qiaozhanwei@outlook.com

From: guo jiwei<ma...@gmail.com>
Date: 2019-12-09 19:38
To: dev<ma...@dolphinscheduler.apache.org>
Subject: Re: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer
 Sorry for not attach img

On Mon, Dec 9, 2019 at 7:19 PM guo jiwei <te...@gmail.com>> wrote:

Hello everyone ,

    I would like to share some ideas about refactoring WorkerServer/MasterServer for dolphin-scheduler.

Background
   For current implement of dolphin-scheduler, task info are stored in zookeeper , and worker-server is using zookeeper lock to keep executing task continuously. Each worker will try to acquire lock , and if it gets the lock, it has the ability to execute the task (fetch task from zk, get task info from db, and etc), or it has to wait for the lock . This is not a nice way,  for performance, dependance or parallelism.

Proposal
    I suggest worker-server execute task in a way like listening tcp port, and receive task command via rpc request instead.  In this way, worker-server is not using lock or connecting db anymore. And it can execute task concurrently.

General Implementation
   1.  Refactor worker-server as a tcp server listening some port using Netty for tcp communication. And we define our own binary protocol for scheduling.
   2.  After starting worker-server, register itself in zookeeper , ephemeral node like /dolphinscheduler/nodes/worker/test/xxx.xxx.xxx.xxx:9800.  {/dolphinscheduler/nodes/worker/$workerGroup/ip:port}
   3.  MasterServer has take the responsibility for trigger the task, and choose worker-server to execute it 。
     - first, we have to listen for worker nodes in zookeeper. And cache the worker list in memory .
    -  second,  when  MasterScheduleThread acquire the lock to execute command(t_ds_command) ,  it will extract the task info process instance, then establish a tcp connection to target available worker using task info , and send the command to worker.
   4. When worker-server receives a task command , it will deserialize the command into a Task, and execute in thread pool or subprocess.
   5. After worker-server executes the task , it has to report the result using the pre-connected socket to MasterServer。but if the socket is closed in any way, WorkerServer has to connect to any other MasterServer to report the result.


More detail for MasterServer/WorkerServer can be discuss in later mail

Simple graph
   [X]
[image.png]




<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Tboy&uid=technotboy%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22technoboy%40apache.org%22%5D>
[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
Tboy
technoboy@apache.org

Re: A proposal for DolphinScheduler- refactor WorkerServer/MasterServer

Posted by guo jiwei <te...@gmail.com>.
 Sorry for not attach img

On Mon, Dec 9, 2019 at 7:19 PM guo jiwei <te...@gmail.com> wrote:

>
> Hello everyone ,
>
>     I would like to share some ideas about refactoring
> WorkerServer/MasterServer for dolphin-scheduler.
>
> Background
>    For current implement of dolphin-scheduler, task info are stored in
> zookeeper , and worker-server is using zookeeper lock to keep executing
> task continuously. Each worker will try to acquire lock , and if it gets
> the lock, it has the ability to execute the task (fetch task from zk, get
> task info from db, and etc), or it has to wait for the lock . This is not a
> nice way,  for performance, dependance or parallelism.
>
> Proposal
>     I suggest worker-server execute task in a way like listening tcp port,
> and receive task command via rpc request instead.  In this way,
> worker-server is not using lock or connecting db anymore. And it can
> execute task concurrently.
>
> General Implementation
>    1.  Refactor worker-server as a tcp server listening some port using
> Netty for tcp communication. And we define our own binary protocol for
> scheduling.
>    2.  After starting worker-server, register itself in zookeeper ,
> ephemeral node like
> /dolphinscheduler/nodes/worker/test/xxx.xxx.xxx.xxx:9800.
>  {/dolphinscheduler/nodes/worker/$workerGroup/ip:port}
>    3.  MasterServer has take the responsibility for trigger the task, and
> choose worker-server to execute it 。
>      - first, we have to listen for worker nodes in zookeeper. And cache
> the worker list in memory .
>     -  second,  when  MasterScheduleThread acquire the lock to execute
> command(t_ds_command) ,  it will extract the task info process instance,
> then establish a tcp connection to target available worker using task info
> , and send the command to worker.
>    4. When worker-server receives a task command , it will deserialize the
> command into a Task, and execute in thread pool or subprocess.
>    5. After worker-server executes the task , it has to report the result
> using the pre-connected socket to MasterServer。but if the socket is closed
> in any way, WorkerServer has to connect to any other MasterServer to report
> the result.
>
>
> More detail for MasterServer/WorkerServer can be discuss in later mail
>
> Simple graph
>
> [image: image.png]
>
>
>
>
> Tboy
> technoboy@apache.org
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=Tboy&uid=technotboy%40gmail.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22technoboy%40apache.org%22%5D>
>