You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/01/09 11:20:26 UTC

[incubator-dolphinscheduler] branch dev-1.2.1 updated: fix #1775 bug,delete process definition when process instance is running (#1790)

This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev-1.2.1
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev-1.2.1 by this push:
     new 2149833  fix #1775 bug,delete process definition when process instance is running (#1790)
2149833 is described below

commit 214983323c32e6e6ac5c58252592a89e331ee6a5
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Thu Jan 9 19:20:15 2020 +0800

    fix #1775 bug,delete process definition when process instance is running (#1790)
    
    * fix #1775 bug,delete process definition when process instance is running
    
    * revert CONTRIBUTING.md
---
 .../apache/dolphinscheduler/dao/ProcessDao.java    |  5 +++
 .../server/worker/runner/FetchTaskThread.java      | 37 +++++++++++++++++++---
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
index aaa5fa7..33fd21d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
@@ -455,6 +455,11 @@ public class ProcessDao {
         if(tenantId >= 0){
             tenant = tenantMapper.queryById(tenantId);
         }
+
+        if (userId == 0){
+            return null;
+        }
+
         if(tenant == null){
             User user = userMapper.selectById(userId);
             tenant = tenantMapper.queryById(user.getTenantId());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index c70eab6..656fc3b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.queue.ITaskQueue;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -143,6 +144,7 @@ public class FetchTaskThread implements Runnable{
         logger.info("worker start fetch tasks...");
         while (Stopper.isRunning()){
             InterProcessMutex mutex = null;
+            String currentTaskQueueStr = null;
             try {
                 ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
                 //check memory and cpu usage and threads
@@ -168,6 +170,9 @@ public class FetchTaskThread implements Runnable{
                 List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);
 
                 for(String taskQueueStr : taskQueueStrArr){
+
+                    currentTaskQueueStr = taskQueueStr;
+
                     if (StringUtils.isEmpty(taskQueueStr)) {
                         continue;
                     }
@@ -191,13 +196,16 @@ public class FetchTaskThread implements Runnable{
                         continue;
                     }
 
-                    Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
-                            taskInstance.getProcessDefine().getUserId());
+                    // if process definition is null ,process definition already deleted
+                    int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+                    Tenant tenant = processDao.getTenantForProcess(
+                            taskInstance.getProcessInstance().getTenantId(),
+                            userId);
 
                     // verify tenant is null
                     if (verifyTenantIsNull(tenant)) {
                         logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
-                        removeNodeFromTaskQueue(taskQueueStr);
+                        processErrorTask(taskQueueStr);
                         continue;
                     }
 
@@ -236,6 +244,7 @@ public class FetchTaskThread implements Runnable{
                 }
 
             }catch (Exception e){
+                processErrorTask(currentTaskQueueStr);
                 logger.error("fetch task thread failure" ,e);
             }finally {
                 AbstractZKClient.releaseMutex(mutex);
@@ -244,6 +253,25 @@ public class FetchTaskThread implements Runnable{
     }
 
     /**
+     * process error task
+     *
+     * @param taskQueueStr task queue str
+     */
+    private void processErrorTask(String taskQueueStr){
+        // remove from zk
+        removeNodeFromTaskQueue(taskQueueStr);
+
+        if (taskInstance != null){
+            processDao.changeTaskState(ExecutionStatus.FAILURE,
+                    taskInstance.getStartTime(),
+                    taskInstance.getHost(),
+                    null,
+                    null,
+                    taskInstId);
+        }
+
+    }
+    /**
      * remove node from task queue
      *
      * @param taskQueueStr task queue
@@ -273,8 +301,7 @@ public class FetchTaskThread implements Runnable{
      */
     private boolean verifyTenantIsNull(Tenant tenant) {
         if(tenant == null){
-            logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
-                    taskInstance.getProcessDefine().getId(),
+            logger.error("tenant not exists,process instance id : {},task instance id : {}",
                     taskInstance.getProcessInstance().getId(),
                     taskInstance.getId());
             return true;