You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by lg...@apache.org on 2020/01/09 08:58:16 UTC
[incubator-dolphinscheduler] branch dev updated: fix #1775 bug,delete process definition when process instance is running (#1776)
This is an automated email from the ASF dual-hosted git repository.
lgcareer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b9ac6d6 fix #1775 bug,delete process definition when process instance is running (#1776)
b9ac6d6 is described below
commit b9ac6d66f1d5e0a2d170278630eac267bb473be1
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Thu Jan 9 16:58:05 2020 +0800
fix #1775 bug,delete process definition when process instance is running (#1776)
* remove LogViewServiceGrpc.java file and pom modify
* remove kazoo
* remove kazoo
* remove kazoo
* remove common monitor package
* add license
* remove kazoo modify
* remove kazoo modify
* remove kazoo modify
* remove kazoo modify
* remove kazoo modify
* remove kazoo modify
* install.sh remove python kazoo
* add system param whether repeat running
* remove kazoo modify
* BusinessTimeUtils remove whther repeat running inner param
* add AccessTokenMapperTest UT
* CI UT yml modify,start postgresql and zookeeper by default
* add AlertGroupMapperTest UT in github action
* Conflicts reslove
* AlertMappert UT modify
* AlertMappert UT modify
* AlertMappert UT modify
* CommandMapperTest UT modify
* DataSourceMapperTest UT modify
* fix #1775 bug,delete process definition when process instance is running
* verifyTenantIsNull remove process definnition id print log
---
.../apache/dolphinscheduler/dao/ProcessDao.java | 5 +++
.../server/worker/runner/FetchTaskThread.java | 41 ++++++++++++++++++----
2 files changed, 40 insertions(+), 6 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
index ffb6feb..a45fbff 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
@@ -457,6 +457,11 @@ public class ProcessDao {
if(tenantId >= 0){
tenant = tenantMapper.queryById(tenantId);
}
+
+ if (userId == 0){
+ return null;
+ }
+
if(null == tenant){
User user = userMapper.selectById(userId);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index 4899f4b..221ad06 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -139,6 +140,7 @@ public class FetchTaskThread implements Runnable{
logger.info("worker start fetch tasks...");
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
+ String currentTaskQueueStr = null;
try {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
@@ -165,6 +167,9 @@ public class FetchTaskThread implements Runnable{
List<String> taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);
for(String taskQueueStr : taskQueueStrArr){
+
+ currentTaskQueueStr = taskQueueStr;
+
if (StringUtils.isEmpty(taskQueueStr)) {
continue;
}
@@ -184,7 +189,7 @@ public class FetchTaskThread implements Runnable{
// verify task instance is null
if (verifyTaskInstanceIsNull(taskInstance)) {
logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
- removeNodeFromTaskQueue(taskQueueStr);
+ processErrorTask(taskQueueStr);
continue;
}
@@ -192,13 +197,17 @@ public class FetchTaskThread implements Runnable{
continue;
}
- Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
- taskInstance.getProcessDefine().getUserId());
+ // if process definition is null ,process definition already deleted
+ int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
+
+ Tenant tenant = processDao.getTenantForProcess(
+ taskInstance.getProcessInstance().getTenantId(),
+ userId);
// verify tenant is null
if (verifyTenantIsNull(tenant)) {
logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
- removeNodeFromTaskQueue(taskQueueStr);
+ processErrorTask(taskQueueStr);
continue;
}
@@ -232,6 +241,7 @@ public class FetchTaskThread implements Runnable{
}
}catch (Exception e){
+ processErrorTask(currentTaskQueueStr);
logger.error("fetch task thread failure" ,e);
}finally {
AbstractZKClient.releaseMutex(mutex);
@@ -240,6 +250,26 @@ public class FetchTaskThread implements Runnable{
}
/**
+ * process error task
+ *
+ * @param taskQueueStr task queue str
+ */
+ private void processErrorTask(String taskQueueStr){
+ // remove from zk
+ removeNodeFromTaskQueue(taskQueueStr);
+
+ if (taskInstance != null){
+ processDao.changeTaskState(ExecutionStatus.FAILURE,
+ taskInstance.getStartTime(),
+ taskInstance.getHost(),
+ null,
+ null,
+ taskInstId);
+ }
+
+ }
+
+ /**
* remove node from task queue
*
* @param taskQueueStr task queue
@@ -269,8 +299,7 @@ public class FetchTaskThread implements Runnable{
*/
private boolean verifyTenantIsNull(Tenant tenant) {
if(tenant == null){
- logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
- taskInstance.getProcessDefine().getId(),
+ logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;