You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by gi...@apache.org on 2020/03/11 02:47:08 UTC

[incubator-dolphinscheduler-website] branch asf-site updated: Automated deployment: Wed Mar 11 02:47:01 UTC 2020 b96345612225fc9646e0b11f98802ba898b8217c

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 2120623  Automated deployment: Wed Mar 11 02:47:01 UTC 2020 b96345612225fc9646e0b11f98802ba898b8217c
2120623 is described below

commit 21206236fa67ed4932eb8ac5277185a22c8e9972
Author: dailidong <da...@users.noreply.github.com>
AuthorDate: Wed Mar 11 02:47:01 2020 +0000

    Automated deployment: Wed Mar 11 02:47:01 UTC 2020 b96345612225fc9646e0b11f98802ba898b8217c
---
 .../1.2.0/user_doc/masterserver-code-analysis.html | 383 +++++++++++++++++++++
 .../1.2.0/user_doc/masterserver-code-analysis.json |   6 +
 2 files changed, 389 insertions(+)

diff --git a/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.html b/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.html
new file mode 100644
index 0000000..b09e7fe
--- /dev/null
+++ b/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.html
@@ -0,0 +1,383 @@
+<!DOCTYPE html>
+<html lang="en">
+
+<head>
+	<meta charset="UTF-8">
+	<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
+	<meta name="keywords" content="masterserver-code-analysis" />
+	<meta name="description" content="masterserver-code-analysis" />
+	<!-- 网页标签标题 -->
+	<title>masterserver-code-analysis</title>
+	<link rel="shortcut icon" href="/img/docsite.ico"/>
+	<link rel="stylesheet" href="/build/documentation.css" />
+</head>
+<body>
+	<div id="root"><div class="documentation-page" data-reactroot=""><header class="header-container header-container-normal"><div class="header-body"><a href="/zh-cn/index.html"><img class="logo" src="/img/hlogo_colorful.svg"/></a><div class="search search-normal"><span class="icon-search"></span></div><span class="language-switch language-switch-normal">En</span><div class="header-menu"><img class="header-menu-toggle" src="/img/system/menu_gray.png"/><div><ul class="ant-menu blackClass an [...]
+<ul>
+<li>Zookeeper 节点初始化</li>
+<li>构建并提交工作流实例,跟踪运行状态</li>
+<li>监控其他MasterServer和WorkerServer的健康状态并容错</li>
+<li>维系心跳</li>
+</ul>
+<pre><code>@PostConstruct
+public void run(){
+        //详情见1.zookeeper初始化
+        zkMasterClient.init(); 
+        //详情见2.MasterSchedulerThread线程
+        masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor(&quot;Master-Scheduler-Thread&quot;);
+        //详情见3.heartBeatThread线程
+        heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor(&quot;Master-Main-Thread&quot;,Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
+}
+</code></pre>
+<h1>1. zookeeper初始化</h1>
+<p>创建DS在zookeeper的相关节点,并判断是否对系统做failover,恢复异常的工作流实例和任务实例。</p>
+<ul>
+<li>用于master的failover /dolphinscheduler/lock/failover/master</li>
+<li>系统节点,保存master和worker的心跳信息 /dolphinscheduler/masters; /dolphinscheduler/workers;/dolphinscheduler/dead-servers</li>
+</ul>
+<pre><code>public void init(){
+	logger.info(&quot;initialize master client...&quot;);
+	this.initDao();
+	InterProcessMutex mutex = null;
+	try {
+	    //创建分布式锁节点,用于master节点的failover 
+		String znodeLock = getMasterStartUpLockPath();
+		mutex = new InterProcessMutex(zkClient, znodeLock);
+		mutex.acquire();
+		// 在ZK中初始化系统节点,
+		this.initSystemZNode();
+		// 向ZK的/masters节点注册当前的master信息
+		this.registerMaster();
+		// 通过监听Zookeeper临时节点变化来进行容错处理(如果活跃的master只有自身一个,则进行failover)
+		if (getActiveMasterNum() == 1) { 
+			failoverWorker(null, true);  //恢复任务实例 详情见1.1.
+			failoverMaster(null);   //恢复工作流实例 详情见1.2.
+		}
+	}catch (Exception e){
+		logger.error(&quot;master start up  exception&quot;,e); 
+	}finally {
+		releaseMutex(mutex);
+	}
+}
+</code></pre>
+<h2>1.1. failoverWorker 恢复任务实例</h2>
+<pre><code>private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
+   logger.info(&quot;start worker[{}] failover ...&quot;, workerHost);
+
+   List&lt;TaskInstance&gt; needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
+   for(TaskInstance taskInstance : needFailoverTaskInstanceList){
+      if(needCheckWorkerAlive){
+         if(!checkTaskInstanceNeedFailover(taskInstance)){
+             //不需要failover的两种情况
+             // 1.任务详情中不存在host信息
+             //2.任务在ZK中存在,则判断启动时间是否小于worker启动时间,小于则不用failover
+            continue;
+               }
+      }
+
+      ProcessInstance instance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+      if(instance!=null){
+         taskInstance.setProcessInstance(instance);
+      }
+      // 如果任务中有yarn的任务则杀掉,kill的方式,日志中用正则匹配containId的格式,获取containID,用yarn命令kill。
+      ProcessUtils.killYarnJob(taskInstance);
+      //把任务的状态从“running”改为“need failover”
+      taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
+      processService.saveTaskInstance(taskInstance);
+   }
+   logger.info(&quot;end worker[{}] failover ...&quot;, workerHost);
+}
+</code></pre>
+<h2>1.2. failoverMaster 恢复工作流实例</h2>
+<pre><code>private void failoverMaster(String masterHost) {
+   logger.info(&quot;start master failover ...&quot;);
+  //获取需要failover的工作流实例
+   List&lt;ProcessInstance&gt; needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
+   
+   for(ProcessInstance processInstance : needFailoverProcessInstanceList){
+       // 1.更新工作流实例的host为null
+       // 2.写入 t_ds_commond 表一条恢复工作流实例的命令
+      processService.processNeedFailoverProcessInstances(processInstance);
+   }
+
+   logger.info(&quot;master failover end&quot;);
+}
+</code></pre>
+<h1>2. MasterSchedulerThread 线程</h1>
+<p>该线程主要对command进行解析生成工作流实例</p>
+<pre><code>public void run() {
+    logger.info(&quot;master scheduler start successfully...&quot;);
+    while (Stopper.isRunning()){
+
+        // process instance
+        ProcessInstance processInstance = null;
+
+        InterProcessMutex mutex = null;
+        try {
+
+            boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
+            if(!runCheckFlag) {
+                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                continue;
+            }
+            if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
+
+                //创建分布式锁 /dolphinscheduler/lock/masters
+                String znodeLock = zkMasterClient.getMasterLockPath();
+
+                mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
+                mutex.acquire();
+
+                ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
+                int activeCount = poolExecutor.getActiveCount();
+                // 需要确保实例构建存储过程和command数据从表中删除的过程在一个事务中
+                Command command = processService.findOneCommand();
+                if (command != null) {
+                    logger.info(&quot;find one command: id: {}, type: {}&quot;, command.getId(),command.getCommandType());
+
+                    try{
+                        // handleCommand将commond解析成processInstance 详情见2.1
+                        processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
+                        if (processInstance != null) {
+                            logger.info(&quot;start master exec thread , split DAG ...&quot;);
+                            // masterExecService,master执行线程 详情见 2.2
+                            masterExecService.execute(new MasterExecThread(processInstance, processService));
+                        }
+                    }catch (Exception e){
+                        logger.error(&quot;scan command error &quot;, e);
+                        processService.moveToErrorCommand(command, e.toString());
+                    }
+                } else{
+                    //indicate that no command ,sleep for 1s
+                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                }
+            }
+        }catch (Exception e){
+            logger.error(&quot;master scheduler thread exception&quot;,e);
+        }finally{
+            AbstractZKClient.releaseMutex(mutex);
+        }
+    }
+    logger.info(&quot;master server stopped...&quot;);
+}
+</code></pre>
+<h2>2.1. handleCommand</h2>
+<p>根据command对象构建工作流实例,构建后把该条command从t_ds_command表中删除,需要确保的是实例构建存储过程和command数据从表中删除的过程在一个事务中。</p>
+<p>command所有类型如下</p>
+<ul>
+<li>0 start a new process</li>
+<li>1 start a new process from current nodes</li>
+<li>2 recover tolerance fault process</li>
+<li>3 recover suspended process</li>
+<li>4 start process from failure task nodes</li>
+<li>5 complement data</li>
+<li>6 start a new process from scheduler</li>
+<li>7 repeat running a process</li>
+<li>8 pause a process</li>
+<li>9 stop a process</li>
+<li>10 recover waiting thread</li>
+</ul>
+<pre><code>@Transactional(rollbackFor = Exception.class)
+public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
+     //根据command命令生成新的工作流程实例
+     ProcessInstance processInstance = constructProcessInstance(command, host);
+    //cannot construct process instance, return null;
+    if(processInstance == null){
+        logger.error(&quot;scan command, command parameter is error: %s&quot;, command.toString());
+        moveToErrorCommand(command, &quot;process instance is null&quot;);
+        return null;
+    }
+    if(!checkThreadNum(command, validThreadNum)){
+        logger.info(&quot;there is not enough thread for this command: {}&quot;,command.toString() );
+        return setWaitingThreadProcess(command, processInstance);
+    }
+    processInstance.setCommandType(command.getCommandType());
+    processInstance.addHistoryCmd(command.getCommandType());
+    saveProcessInstance(processInstance);
+    this.setSubProcessParam(processInstance);
+    //保存了任务流实例后将该命令删除
+    delCommandByid(command.getId());
+    return processInstance;
+}
+</code></pre>
+<h2>2.2. MasterExecThread 执行线程</h2>
+<pre><code>public void run() {
+    ......
+
+    try {
+        //检查此过程是否是补数 且 流程实例是否为子流程
+        if (processInstance.isComplementData() &amp;&amp;  Flag.NO == processInstance.getIsSubProcess()){
+            // 详情见2.2.2. 执行补数
+            executeComplementProcess();  
+        }else{
+            //详情见2.2.1. 执行流程实例
+            executeProcess(); 
+        }
+    ......
+}
+</code></pre>
+<h3>2.2.1. executeProcess() 执行流程实例</h3>
+<pre><code>private void executeProcess() throws Exception {
+    //1.根据流程实例id查找有效的任务列表 initTaskQueue()
+    //2.构建DAG处理流程 buildFlowDag() 返回DAG对象,主要包括两个信息:vertex 点,即任务执行节点;edge 边,即任务之间的依赖关系
+    prepareProcess();
+    //提交并监控任务,直到工作流停止 详情见2.2.1.1
+    runProcess();
+    //当线程池不足以供流程实例使用时,创建恢复等待线程命令。
+    //子工作流程实例无需创建恢复命令。
+    //创建恢复等待线程命令并同时删除origin命令。
+   //如果存在recovery命令,则仅更新字段update_time
+    endProcess();
+}
+</code></pre>
+<h4>2.2.1.1. runProcess()提交并监控任务</h4>
+<p>submitPostNode方法传入父任务节点的名字,通过节点名,DAG,获取任务节点列表,并生成任务实例列表readyToSubmitTaskList</p>
+<pre><code>private void runProcess(){
+    submitPostNode(null);
+</code></pre>
+<p>submitStandByTask()方法里面会遍历任务实例列表readyToSubmitTaskList,判断任务实例的依赖关系,依赖项运行成功则会提交任务执行线程,失败则把当前节点状态改为失败。</p>
+<pre><code>
+       if(canSubmitTaskToQueue()){
+           submitStandByTask();
+       }
+       try {
+           Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+       } catch (InterruptedException e) {
+           logger.error(e.getMessage(),e);
+       }
+       updateProcessInstanceState();
+   }
+
+   logger.info(&quot;process:{} end, state :{}&quot;, processInstance.getId(), processInstance.getState());
+}
+</code></pre>
+<p>submitStandByTask()最终会调用submitTaskExec,这里有个MasterBaseTaskExecThread线程
+MasterBaseTaskExecThread线程有两个主要作用</p>
+<ul>
+<li>用于把任务实例信息提交到数据库中submitTask()</li>
+<li>把任务信息写进zookeeper队列 submitTaskToQueue(),后续worker会来认领任务。(节点命名方式:{processInstancePriority}_{processInstanceId}<em>{taskInstancePriority}_{taskInstanceId}</em><span class="katex"><span class="katex-mathml"><math><semantics><mrow><mrow><mi>t</mi><mi>a</mi><mi>s</mi><mi>k</mi><mi>e</mi><mi>x</mi><mi>e</mi><mi>c</mi><mi>u</mi><mi>t</mi><mi>e</mi><mi>d</mi><mi>b</mi><mi>y</mi><mi>i</mi><mi>p</mi><mn>1</mn></mrow><mo separator="true">,</mo></mrow><annotation encoding="application/x-tex">{task [...]
+</ul>
+<p>另外MasterBaseTaskExecThread有两个子类,除了上面的两个作用外:</p>
+<ul>
+<li>MasterTaskExecThread 任务执行完成后会把需要kill的任务信息写入zk队列中等待worker来kill任务。</li>
+<li>SubProcessTaskExecThread 在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。</li>
+</ul>
+<p>MasterBaseTaskExecThread线程异步提交,会把结果写入activeTaskNode。</p>
+<pre><code>    private TaskInstance submitTaskExec(TaskInstance taskInstance) {
+        MasterBaseTaskExecThread abstractExecThread = null;
+        if(taskInstance.isSubProcess()){
+            abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
+        }else {
+            abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
+        }
+        Future&lt;Boolean&gt; future = taskExecService.submit(abstractExecThread);
+        activeTaskNode.putIfAbsent(abstractExecThread, future);
+        return abstractExecThread.getTaskInstance();
+    }
+</code></pre>
+<p>然后会遍历activeTaskNode,判断线程是否执行完成,若完成则移除该线程信息,再判断节点是否执行成功</p>
+<pre><code>   for(Map.Entry&lt;MasterBaseTaskExecThread,Future&lt;Boolean&gt;&gt; entry: activeTaskNode.entrySet()) {
+                Future&lt;Boolean&gt; future = entry.getValue();
+                TaskInstance task  = entry.getKey().getTaskInstance();
+
+                if(!future.isDone()){
+                    continue;
+                }
+                // node monitor thread complete
+                activeTaskNode.remove(entry.getKey());
+                if(task == null){
+                    this.taskFailedSubmit = true;
+                    continue;
+                }
+                logger.info(&quot;task :{}, id:{} complete, state is {} &quot;,
+                        task.getName(), task.getId(), task.getState().toString());
+                // 如果节点成功,则继续提交任务节点
+                if(task.getState() == ExecutionStatus.SUCCESS){
+                    completeTaskList.put(task.getName(), task);
+                    submitPostNode(task.getName());
+                    continue;
+                }
+                // 如果节点失败,先重试,然后再继续执行失败流程
+                if(task.getState().typeIsFailure()){
+                    if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
+                        this.recoverToleranceFaultTaskList.add(task);
+                    }
+                    if(task.taskCanRetry()){
+                        addTaskToStandByList(task);
+                    }else{
+                        completeTaskList.put(task.getName(), task);
+                        if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) ||
+                                haveConditionsAfterNode(task.getName())) {
+                            submitPostNode(task.getName());
+                        }else{
+                            errorTaskList.put(task.getName(), task);
+                            if(processInstance.getFailureStrategy() == FailureStrategy.END){
+                                killTheOtherTasks();
+                            }
+                        }
+                    }
+                    continue;
+                }
+                // other status stop/pause
+                completeTaskList.put(task.getName(), task);
+            }
+            // send alert
+</code></pre>
+<h3>2.2.2. executeComplementProcess() 执行补数流程实例</h3>
+<pre><code>private void executeComplementProcess() throws Exception {
+....
+//根据调度的时间规则和补数的时间范围计算出需要补数的日期列表
+int processDefinitionId = processInstance.getProcessDefinitionId();
+List&lt;Schedule&gt; schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
+List&lt;Date&gt; listDate = Lists.newLinkedList();
+if(!CollectionUtils.isEmpty(schedules)){
+    for (Schedule schedule : schedules) {
+        listDate.addAll(CronUtils.getSelfFireDateList(startDate, endDate, schedule.getCrontab()));
+    }
+}
+//接下来是一个循环,用日期列表的每个日期执行一次
+//以下三个方法同 2.2.1
+....
+prepareProcess();
+....
+runProcess();
+....
+endProcess();
+</code></pre>
+<h1>3. heartBeatThread线程</h1>
+<p>每30秒上报一次心跳信息,
+同时判断host是否在dead-servers节点下,即判断进程是否已经挂了。
+进程正常则更新zookeeper的/dolphinscheduler/masters/${host}/ 下的节点名称,包括以下信息
+ip, port ,cpUsage, memoryUsage, loadAverage, registerTIme, currentTime</p>
+<pre><code>    private Runnable heartBeatThread(){
+        logger.info(&quot;start master heart beat thread...&quot;);
+        Runnable heartBeatThread  = new Runnable() {
+            @Override
+            public void run() {
+                if(Stopper.isRunning()) {
+                    // send heartbeat to zk
+                    if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
+                        logger.error(&quot;master send heartbeat to zk failed: can't find zookeeper path of master server&quot;);
+                        return;
+                    }
+
+                    zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
+                }
+            }
+        };
+        return heartBeatThread;
+    }
+</code></pre>
+</div></section><footer class="footer-container"><div class="footer-body"><img src="/img/ds_gray.svg"/><div class="cols-container"><div class="col col-12"><h3>Disclaimer</h3><p>Apache DolphinScheduler (incubating) is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by Incubator. 
+Incubation is required of all newly accepted projects until a further review indicates 
+that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. 
+While incubation status is not necessarily a reflection of the completeness or stability of the code, 
+it does indicate that the project has yet to be fully endorsed by the ASF.</p></div><div class="col col-6"><dl><dt>文档</dt><dd><a href="/zh-cn/docs/1.2.0/user_doc/architecture-design.html" target="_self">概览</a></dd><dd><a href="/zh-cn/docs/1.2.0/user_doc/quick-start.html" target="_self">快速开始</a></dd><dd><a href="/zh-cn/docs/1.2.0/user_doc/backend-development.html" target="_self">开发者指南</a></dd></dl></div><div class="col col-6"><dl><dt>ASF</dt><dd><a href="http://www.apache.org" target="_se [...]
+	<script src="https://f.alicdn.com/react/15.4.1/react-with-addons.min.js"></script>
+	<script src="https://f.alicdn.com/react/15.4.1/react-dom.min.js"></script>
+	<script>
+		window.rootPath = '';
+  </script>
+	<script src="/build/documentation.js"></script>
+</body>
+</html>
\ No newline at end of file
diff --git a/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.json b/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.json
new file mode 100644
index 0000000..9e5e0c2
--- /dev/null
+++ b/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.json
@@ -0,0 +1,6 @@
+{
+  "filename": "masterserver-code-analysis.md",
+  "__html": "<p>这一篇主要讲解的是dolphinscheduler的master部分的源码,从主类MasterServer开始,从启动到运行,master主要做了以下三件事情</p>\n<ul>\n<li>Zookeeper 节点初始化</li>\n<li>构建并提交工作流实例,跟踪运行状态</li>\n<li>监控其他MasterServer和WorkerServer的健康状态并容错</li>\n<li>维系心跳</li>\n</ul>\n<pre><code>@PostConstruct\npublic void run(){\n        //详情见1.zookeeper初始化\n        zkMasterClient.init(); \n        //详情见2.MasterSchedulerThread线程\n        masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor(&quot;Master-Scheduler-Thread&quot;); [...]
+  "link": "/zh-cn/docs/1.2.0/user_doc/masterserver-code-analysis.html",
+  "meta": {}
+}
\ No newline at end of file