You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/14 08:18:16 UTC
[dolphinscheduler] branch dev updated: Make LogServiceClient Singleton (#11777)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d3a77c68e6 Make LogServiceClient Singleton (#11777)
d3a77c68e6 is described below
commit d3a77c68e62b0c471e25cd6a4bfbd2c561dd2023
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Wed Sep 14 16:18:08 2022 +0800
Make LogServiceClient Singleton (#11777)
---
.../api/exceptions/ServiceException.java | 36 ++---
.../api/service/impl/ExecutorServiceImpl.java | 10 +-
.../api/service/impl/LoggerServiceImpl.java | 19 +--
.../service/impl/ProcessInstanceServiceImpl.java | 105 ++++++--------
.../api/service/impl/SchedulerServiceImpl.java | 10 +-
.../api/exceptions/ServiceExceptionTest.java | 14 +-
.../api/service/ExecutorServiceTest.java | 3 +-
.../api/service/LoggerServiceTest.java | 49 +++----
.../api/service/ProcessDefinitionServiceTest.java | 2 +-
.../api/service/ProcessInstanceServiceTest.java | 38 +++--
.../api/service/TaskInstanceServiceTest.java | 3 +-
.../server/log/LoggerRequestProcessor.java | 6 +-
.../master/service/MasterFailoverService.java | 12 +-
.../master/service/WorkerFailoverService.java | 31 ++---
.../master/utils/DataQualityResultOperator.java | 3 +-
.../master/registry/MasterRegistryClientTest.java | 17 +--
.../server/master/service/FailoverServiceTest.java | 18 ++-
.../remote/NettyRemotingClient.java | 104 ++++----------
.../remote/config/NettyClientConfig.java | 64 ++-------
.../server/utils/ProcessUtils.java | 13 +-
.../log/{LogClientService.java => LogClient.java} | 155 ++++++++++-----------
.../dolphinscheduler/service/log/LogPromise.java | 108 --------------
.../service/process/ProcessService.java | 3 +-
.../service/process/ProcessServiceImpl.java | 102 +++++++-------
...ogClientServiceTest.java => LogClientTest.java} | 33 ++---
.../plugin/task/api/AbstractRemoteTask.java | 4 +-
.../plugin/task/api/AbstractYarnTask.java | 20 +--
.../plugin/task/api/utils/LogUtils.java | 12 +-
.../plugin/task/api/utils/LogUtilsTest.java | 8 +-
.../plugin/task/dinky/DinkyTask.java | 39 +++---
.../plugin/task/emr/EmrAddStepsTask.java | 50 +++----
.../plugin/task/emr/EmrJobFlowTask.java | 6 +-
.../plugin/task/flink/FlinkStreamTask.java | 8 +-
.../plugin/task/flink/FlinkTask.java | 47 +------
.../plugin/task/hivecli/HiveCliTask.java | 6 +-
.../plugin/task/jupyter/JupyterTask.java | 10 +-
.../dolphinscheduler/plugin/task/k8s/K8sTask.java | 8 +-
.../plugin/task/pigeon/PigeonTask.java | 45 +++---
.../plugin/task/sagemaker/SagemakerTask.java | 6 +-
.../plugin/task/seatunnel/SeatunnelTask.java | 22 +--
.../plugin/task/zeppelin/ZeppelinTask.java | 14 +-
.../server/worker/processor/TaskKillProcessor.java | 49 ++++---
.../worker/runner/WorkerTaskExecuteRunnable.java | 8 +-
43 files changed, 518 insertions(+), 802 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
index 369fbbec3e..2fa3e01a1c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
@@ -20,48 +20,36 @@ import org.apache.dolphinscheduler.api.enums.Status;
import java.text.MessageFormat;
+import lombok.Data;
-/**
- * service exception
- */
+@Data
public class ServiceException extends RuntimeException {
- /**
- * code
- */
- private Integer code;
+ private int code;
public ServiceException() {
+ this(Status.INTERNAL_SERVER_ERROR_ARGS);
}
public ServiceException(Status status) {
- super(status.getMsg());
- this.code = status.getCode();
+ this(status.getCode(), status.getMsg());
}
public ServiceException(Status status, Object... formatter) {
- super(MessageFormat.format(status.getMsg(), formatter));
- this.code = status.getCode();
- }
-
- public ServiceException(Integer code,String message) {
- super(message);
- this.code = code;
+ this(status.getCode(), MessageFormat.format(status.getMsg(), formatter));
}
public ServiceException(String message) {
- super(message);
+ this(Status.INTERNAL_SERVER_ERROR_ARGS, message);
}
- public ServiceException(String message, Exception cause) {
- super(message, cause);
+ public ServiceException(int code, String message) {
+ this(code, message, null);
}
- public Integer getCode() {
- return this.code;
- }
-
- public void setCode(Integer code) {
+ public ServiceException(int code, String message, Exception cause) {
+ super(message, cause);
this.code = code;
}
+
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index b4fb3fb8aa..20b2d9604c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -31,6 +31,7 @@ import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LEN
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -366,11 +367,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
- if (processInstance == null) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
- return result;
- }
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+ .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@@ -1022,7 +1020,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/
@Override
public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index 39f9208b5f..518a03004a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@@ -66,7 +66,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@Autowired
private ProcessService processService;
- private LogClientService logClient;
+ @Autowired
+ private LogClient logClient;
@Autowired
ProjectMapper projectMapper;
@@ -77,20 +78,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
- @PostConstruct
- public void init() {
- if (Objects.isNull(this.logClient)) {
- this.logClient = new LogClientService();
- }
- }
-
- @PreDestroy
- public void close() {
- if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) {
- logClient.close();
- }
- }
-
/**
* view log
*
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index d6c6cc329f..cf98c5a3b4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -17,17 +17,10 @@
package org.apache.dolphinscheduler.api.service.impl;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
-import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
-import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
-import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
-import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
-
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -76,9 +69,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@@ -95,12 +88,18 @@ import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
+import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
+import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
+import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
+import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
/**
* process instance service impl
@@ -226,7 +225,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
+ .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@@ -339,17 +339,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public Map<String, Object> queryTaskListByProcessId(User loginUser, long projectCode,
Integer processId) throws IOException {
Project project = projectMapper.queryByCode(projectCode);
- // check user access for project
- Map<String, Object> result =
- projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
+ //check user access for project
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
- ProcessDefinition processDefinition =
- processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
+ .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
+ ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
+ putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId);
return result;
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
@@ -481,28 +480,23 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
String globalParams,
String locations, int timeout, String tenantCode) {
Project project = projectMapper.queryByCode(projectCode);
- // check user access for project
- Map<String, Object> result =
- projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
+ //check user access for project
+ Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
- // check process instance exists
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
- if (processInstance == null) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
- return result;
- }
- // check process instance exists in project
- ProcessDefinition processDefinition0 =
- processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ //check process instance exists
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+ .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
+ //check process instance exists in project
+ ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+ putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
- // check process instance status
+ //check process instance status
if (!processInstance.getState().isFinished()) {
- putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
+ putMsg(result, PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
@@ -620,11 +614,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
- ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
- if (subInstance == null) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
- return result;
- }
+ ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId)
+ .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, subId));
if (subInstance.getIsSubProcess() == Flag.NO) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
return result;
@@ -660,23 +651,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
- if (null == processInstance) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
- return result;
- }
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
+ .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
// check process instance status
if (!processInstance.getState().isFinished()) {
- putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
- processInstance.getName(), processInstance.getState().toString(), "delete");
- return result;
+ throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete");
}
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
- return result;
+ throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
}
try {
@@ -722,7 +707,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+ putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
@@ -811,7 +796,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+ putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
GanttDto ganttDto = new GanttDto();
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 392543e788..677d580e2d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -78,9 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.cronutils.model.Cron;
-/**
- * scheduler service impl
- */
@Service
public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerService {
@@ -382,9 +379,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
} catch (Exception e) {
- result.put(Constants.MSG,
- scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
- throw new ServiceException(result.get(Constants.MSG).toString(), e);
+ Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
+ : Status.OFFLINE_SCHEDULE_ERROR;
+ result.put(Constants.STATUS, status);
+ throw new ServiceException(status, e);
}
putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
index a574253d1d..3bb6479ba9 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
@@ -17,25 +17,27 @@
package org.apache.dolphinscheduler.api.exceptions;
import org.apache.dolphinscheduler.api.enums.Status;
+
import org.junit.Assert;
import org.junit.Test;
public class ServiceExceptionTest {
+
@Test
- public void getCodeTest(){
+ public void getCodeTest() {
ServiceException serviceException = new ServiceException();
- Assert.assertNull(serviceException.getCode());
+ Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), serviceException.getCode());
serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
- Assert.assertNotNull(serviceException.getCode());
+ Assert.assertEquals(Status.ALERT_GROUP_EXIST.getCode(), serviceException.getCode());
serviceException = new ServiceException(10012, "alarm group already exists");
- Assert.assertNotNull(serviceException.getCode());
+ Assert.assertEquals(10012, serviceException.getCode());
}
@Test
- public void getMessageTest(){
+ public void getMessageTest() {
ServiceException serviceException = new ServiceException();
- Assert.assertNull(serviceException.getMessage());
+ Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), serviceException.getMessage());
serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
Assert.assertNotNull(serviceException.getMessage());
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9affbd2c2f..2a2409e06a 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
@@ -179,7 +180,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
- Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
+ Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance));
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index 4be8e246fc..1391ee0857 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.api.service;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
+
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -28,15 +31,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@@ -47,9 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
-import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
-
/**
* logger service test
*/
@@ -74,10 +73,8 @@ public class LoggerServiceTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
- @Before
- public void init() {
- this.loggerService.init();
- }
+ @Mock
+ private LogClient logClient;
@Test
public void testQueryDataSourceList() {
@@ -85,11 +82,11 @@ public class LoggerServiceTest {
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Result result = loggerService.queryLog(2, 1, 1);
- //TASK_INSTANCE_NOT_FOUND
+ // TASK_INSTANCE_NOT_FOUND
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
try {
- //HOST NOT FOUND OR ILLEGAL
+ // HOST NOT FOUND OR ILLEGAL
result = loggerService.queryLog(1, 1, 1);
} catch (RuntimeException e) {
Assert.assertTrue(true);
@@ -97,7 +94,7 @@ public class LoggerServiceTest {
}
Assert.assertEquals(Status.TASK_INSTANCE_HOST_IS_NULL.getCode(), result.getCode().intValue());
- //SUCCESS
+ // SUCCESS
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
@@ -111,7 +108,7 @@ public class LoggerServiceTest {
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
- //task instance is null
+ // task instance is null
try {
loggerService.getLogBytes(2);
} catch (RuntimeException e) {
@@ -119,7 +116,7 @@ public class LoggerServiceTest {
logger.error("testGetLogBytes error: {}", "task instance is null");
}
- //task instance host is null
+ // task instance host is null
try {
loggerService.getLogBytes(1);
} catch (RuntimeException e) {
@@ -127,11 +124,13 @@ public class LoggerServiceTest {
logger.error("testGetLogBytes error: {}", "task instance host is null");
}
- //success
+ // success
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
- //if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
+ // if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
// so no assert will be added here
+ Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
+ .thenReturn(new byte[0]);
loggerService.getLogBytes(1);
}
@@ -152,12 +151,12 @@ public class LoggerServiceTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
- //SUCCESS
+ // SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
- Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,VIEW_LOG)).thenReturn(result);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
@@ -179,22 +178,20 @@ public class LoggerServiceTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
- //SUCCESS
+ // SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
- Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,DOWNLOAD_LOG )).thenReturn(result);
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
+ .thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
+ Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
+ .thenReturn(new byte[0]);
loggerService.getLogBytes(loginUser, projectCode, 1);
}
- @After
- public void close() {
- this.loggerService.close();
- }
-
/**
* get mock Project
*
@@ -218,4 +215,4 @@ public class LoggerServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index bfd8ecfa34..05c63ab537 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -203,7 +203,7 @@ public class ProcessDefinitionServiceTest {
.checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION);
processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
} catch (ServiceException serviceException) {
- Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode().intValue());
+ Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode());
}
Map<String, Object> result = new HashMap<>();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index 4167673faa..48dd44e468 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
@@ -70,6 +71,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.junit.Assert;
import org.junit.Test;
@@ -308,7 +310,8 @@ public class ProcessInstanceServiceTest {
processDefinition.setProjectCode(projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
- when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
+ when(processService.findProcessInstanceDetailById(processInstance.getId()))
+ .thenReturn(Optional.of(processInstance));
when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceById(loginUser, projectCode, 1);
@@ -353,7 +356,8 @@ public class ProcessInstanceServiceTest {
res.setData("xxx");
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
- when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
+ when(processService.findProcessInstanceDetailById(processInstance.getId()))
+ .thenReturn(Optional.of(processInstance));
when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList);
when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@@ -451,14 +455,18 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = getProcessInstance();
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result);
- when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
- Map<String, Object> processInstanceNullRes =
- processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
- shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
- Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
+ when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
+ try {
+ Map<String, Object> processInstanceNullRes =
+ processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
+ shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
+ Assert.fail();
+ } catch (ServiceException ex) {
+ Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
+ }
// process instance not finish
- when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance);
+ when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceNotFinishRes =
@@ -523,16 +531,20 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
- when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
- Map<String, Object> processInstanceNullRes =
- processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
- Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
+ when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
+ try {
+ Map<String, Object> processInstanceNullRes =
+ processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
+
+ } catch (ServiceException ex) {
+ Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
+ }
// not sub process
ProcessInstance processInstance = getProcessInstance();
processInstance.setIsSubProcess(Flag.NO);
putMsg(result, Status.SUCCESS, projectCode);
- when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance);
+ when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
Map<String, Object> notSubProcessRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, notSubProcessRes.get(Constants.STATUS));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index e1e783c2ac..dd06ce66ef 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -51,6 +51,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.junit.Assert;
import org.junit.Test;
@@ -158,7 +159,7 @@ public class TaskInstanceServiceTest {
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()))
- .thenReturn(processInstance);
+ .thenReturn(Optional.of(processInstance));
Result successRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);
diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index d921d54000..0d04ed8596 100644
--- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++ b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -46,10 +46,8 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@@ -168,9 +166,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
if (!checkPathSecurity(logPath)) {
throw new IllegalArgumentException("Illegal path");
}
- Set<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
+ List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
channel.writeAndFlush(
- new GetAppIdResponseCommand(new ArrayList<>(appIds)).convert2Command(command.getOpaque()));
+ new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
break;
default:
throw new IllegalArgumentException("unknown commandType: " + commandType);
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
index 9e1db701fa..bdb7289aad 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -73,17 +73,21 @@ public class MasterFailoverService {
private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+ private final LogClient logClient;
+
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager,
- @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
+ @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
+ @NonNull LogClient logClient) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
- this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager;
+ this.localAddress = masterConfig.getMasterAddress();
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
+ this.logClient = logClient;
}
@@ -233,7 +237,7 @@ public class MasterFailoverService {
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
- ProcessUtils.killYarnJob(taskExecutionContext);
+ ProcessUtils.killYarnJob(logClient, taskExecutionContext);
}
// kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
index 99c62f1172..a294874ff4 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
@@ -17,13 +17,16 @@
package org.apache.dolphinscheduler.server.master.service;
+import lombok.NonNull;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,13 +40,14 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.StopWatch;
-
+import javax.annotation.Nullable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -52,14 +56,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import lombok.NonNull;
-
@Service
public class WorkerFailoverService {
@@ -70,19 +66,22 @@ public class WorkerFailoverService {
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager;
+ private final LogClient logClient;
private final String localAddress;
public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
- @NonNull ProcessInstanceExecCacheManager cacheManager) {
+ @NonNull ProcessInstanceExecCacheManager cacheManager,
+ @NonNull LogClient logClient) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
- this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
+ this.logClient = logClient;
+ this.localAddress = masterConfig.getMasterAddress();
}
/**
@@ -172,7 +171,7 @@ public class WorkerFailoverService {
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
- ProcessUtils.killYarnJob(taskExecutionContext);
+ ProcessUtils.killYarnJob(logClient, taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
index 42d9d14e49..c196def48a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
@@ -63,8 +63,7 @@ public class DataQualityResultOperator {
if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
ProcessInstance processInstance =
- processService.findProcessInstanceDetailById(
- Integer.parseInt(String.valueOf(taskInstance.getProcessInstanceId())));
+ processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()).orElse(null);
// when the task is failure or cancel, will delete the execute result and statistics value
if (taskResponseEvent.getState().isFailure()
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index b71e267216..23f209bbdf 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -25,8 +25,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.registry.api.ConnectionState;
-import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -34,7 +32,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Arrays;
import java.util.Date;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
@@ -58,26 +56,17 @@ public class MasterRegistryClientTest {
@InjectMocks
private MasterRegistryClient masterRegistryClient;
- @Mock
- private MasterConfig masterConfig;
-
@Mock
private RegistryClient registryClient;
- @Mock
- private ScheduledExecutorService heartBeatExecutor;
-
@Mock
private ProcessService processService;
- @Mock
- private MasterConnectStrategy masterConnectStrategy;
-
@Mock
private MasterHeartBeatTask masterHeartBeatTask;
@Mock
- private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
+ private MasterConfig masterConfig;
@Before
public void before() throws Exception {
@@ -105,7 +94,7 @@ public class MasterRegistryClientTest {
taskInstance.setHost("127.0.0.1:8080");
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString()))
.willReturn(Arrays.asList(taskInstance));
- given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
+ given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(Optional.of(processInstance));
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
Server server = new Server();
server.setHost("127.0.0.1");
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
index e0c3c94714..27be919ff9 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
-
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
@@ -38,12 +37,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
+import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
@@ -89,6 +90,9 @@ public class FailoverServiceTest {
@Mock
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+ @Mock
+ private LogClient logClient;
+
private static int masterPort = 5678;
private static int workerPort = 1234;
@@ -106,17 +110,20 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
+ testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
+ given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
MasterFailoverService masterFailoverService =
- new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager);
+ new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
+ processInstanceExecCacheManager, logClient);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
workflowExecuteThreadPool,
- cacheManager);
+ cacheManager,
+ logClient);
failoverService = new FailoverService(masterFailoverService, workerFailoverService);
- testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
String ip = testMasterHost.split(":")[0];
int port = Integer.valueOf(testMasterHost.split(":")[1]);
Assert.assertEquals(masterPort, port);
@@ -158,7 +165,8 @@ public class FailoverServiceTest {
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
given(processService.findValidTaskListByProcessId(Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
- given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
+ given(processService.findProcessInstanceDetailById(Mockito.anyInt()))
+ .willReturn(Optional.ofNullable(processInstance));
Thread.sleep(1000);
Server masterServer = new Server();
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
index 6668f2d36c..d47d68fa3d 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
@@ -17,6 +17,17 @@
package org.apache.dolphinscheduler.remote;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
@@ -35,6 +46,8 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,112 +56,48 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.timeout.IdleStateHandler;
-
-/**
- * remoting netty client
- */
-public class NettyRemotingClient {
+public class NettyRemotingClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
- /**
- * client bootstrap
- */
private final Bootstrap bootstrap = new Bootstrap();
- /**
- * encoder
- */
private final NettyEncoder encoder = new NettyEncoder();
- /**
- * channels
- */
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
- /**
- * started flag
- */
private final AtomicBoolean isStarted = new AtomicBoolean(false);
- /**
- * worker group
- */
private final EventLoopGroup workerGroup;
- /**
- * client config
- */
private final NettyClientConfig clientConfig;
- /**
- * saync semaphore
- */
private final Semaphore asyncSemaphore = new Semaphore(200, true);
- /**
- * callback thread executor
- */
private final ExecutorService callbackExecutor;
- /**
- * client handler
- */
private final NettyClientHandler clientHandler;
- /**
- * response future executor
- */
private final ScheduledExecutorService responseFutureExecutor;
- /**
- * client init
- *
- * @param clientConfig client config
- */
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
if (Epoll.isAvailable()) {
- this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
- }
- });
+ this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
} else {
- this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
- private final AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
- }
- });
+ this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
}
- this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
- new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
+ this.callbackExecutor = new ThreadPoolExecutor(
+ Constants.CPUS,
+ Constants.CPUS,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(1000),
+ new NamedThreadFactory("CallbackExecutor"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
@@ -157,9 +106,6 @@ public class NettyRemotingClient {
this.start();
}
- /**
- * start
- */
private void start() {
this.bootstrap
@@ -371,9 +317,7 @@ public class NettyRemotingClient {
return null;
}
- /**
- * close
- */
+ @Override
public void close() {
if (isStarted.compareAndSet(true, false)) {
try {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
index 739cbbebe1..18ee88f696 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -17,88 +17,52 @@
package org.apache.dolphinscheduler.remote.config;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.remote.utils.Constants;
-/**
- * netty client config
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class NettyClientConfig {
/**
* worker threads,default get machine cpus
*/
+ @Builder.Default
private int workerThreads = Constants.CPUS;
/**
* whether tpc delay
*/
+ @Builder.Default
private boolean tcpNoDelay = true;
/**
* whether keep alive
*/
+ @Builder.Default
private boolean soKeepalive = true;
/**
* send buffer size
*/
+ @Builder.Default
private int sendBufferSize = 65535;
/**
* receive buffer size
*/
+ @Builder.Default
private int receiveBufferSize = 65535;
/**
* connect timeout millis
*/
+ @Builder.Default
private int connectTimeoutMillis = 3000;
- public int getWorkerThreads() {
- return workerThreads;
- }
-
- public void setWorkerThreads(int workerThreads) {
- this.workerThreads = workerThreads;
- }
-
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public boolean isSoKeepalive() {
- return soKeepalive;
- }
-
- public void setSoKeepalive(boolean soKeepalive) {
- this.soKeepalive = soKeepalive;
- }
-
- public int getSendBufferSize() {
- return sendBufferSize;
- }
-
- public void setSendBufferSize(int sendBufferSize) {
- this.sendBufferSize = sendBufferSize;
- }
-
- public int getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize(int receiveBufferSize) {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public int getConnectTimeoutMillis() {
- return connectTimeoutMillis;
- }
-
- public void setConnectTimeoutMillis(int connectTimeoutMillis) {
- this.connectTimeoutMillis = connectTimeoutMillis;
- }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index 4392d6e14a..a8361fa4ed 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -30,10 +30,11 @@ import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -186,17 +187,15 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
- public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
+ public static @Nullable List<String> killYarnJob(@NonNull LogClient logClient,
+ @NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- List<String> appIds;
- try (LogClientService logClient = new LogClientService()) {
- Host host = Host.of(taskExecutionContext.getHost());
- appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
- }
+ Host host = Host.of(taskExecutionContext.getHost());
+ List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
if (CollectionUtils.isNotEmpty(appIds)) {
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
similarity index 57%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
index 735fbab7c4..611bb49b67 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
@@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
-import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
@@ -46,43 +45,23 @@ import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
-/**
- * log client
- */
-public class LogClientService implements AutoCloseable {
+@Service
+public class LogClient implements AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
+ private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
- private final NettyClientConfig clientConfig;
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private final NettyRemotingClient client;
- private volatile boolean isRunning;
-
- /**
- * request time out
- */
private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L;
- /**
- * construct client
- */
- public LogClientService() {
- this.clientConfig = new NettyClientConfig();
- this.clientConfig.setWorkerThreads(4);
- this.client = new NettyRemotingClient(clientConfig);
- this.isRunning = true;
- }
-
- /**
- * close
- */
- @Override
- public void close() {
- this.client.close();
- this.isRunning = false;
- logger.info("logger client closed");
+ public LogClient() {
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ this.client = new NettyRemotingClient(nettyClientConfig);
+ logger.info("Initialized LogClientService with config: {}", nettyClientConfig);
}
/**
@@ -96,25 +75,30 @@ public class LogClientService implements AutoCloseable {
* @return log content
*/
public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
- logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
+ logger.info("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
- String result = "";
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
- Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
+ Command response = client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
- RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
- response.getBody(), RollViewLogResponseCommand.class);
+ RollViewLogResponseCommand rollReviewLog =
+ JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
+ return "Roll view log response is null";
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.error(
+ "Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error, the current thread has been interrupted",
+ host, port, path, skipLineNum, limit, ex);
+ return "Roll view log error: " + ex.getMessage();
} catch (Exception e) {
- logger.error("roll view log error", e);
- } finally {
- this.client.closeChannel(address);
+ logger.error("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error", host, port,
+ path, skipLineNum, limit, e);
+ return "Roll view log error: " + e.getMessage();
}
- return result;
}
/**
@@ -126,28 +110,31 @@ public class LogClientService implements AutoCloseable {
* @return log content
*/
public String viewLog(String host, int port, String path) {
- logger.info("view log path {}", path);
+ logger.info("View log from host: {}, port: {}, logPath: {}", host, port, path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
- String result = "";
final Host address = new Host(host, port);
try {
if (NetUtils.getHost().equals(host)) {
- result = LoggerUtils.readWholeFileContent(request.getPath());
+ return LoggerUtils.readWholeFileContent(request.getPath());
} else {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
- ViewLogResponseCommand viewLog = JSONUtils.parseObject(
- response.getBody(), ViewLogResponseCommand.class);
- result = viewLog.getMsg();
+ ViewLogResponseCommand viewLog =
+ JSONUtils.parseObject(response.getBody(), ViewLogResponseCommand.class);
+ return viewLog.getMsg();
}
+ return "View log response is null";
}
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.error("View log from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
+ host, port, path, ex);
+ return "View log error: " + ex.getMessage();
} catch (Exception e) {
- logger.error("view log error", e);
- } finally {
- this.client.closeChannel(address);
+ logger.error("View log from host: {}, port: {}, logPath: {} error", host, port, path, e);
+ return "View log error: " + e.getMessage();
}
- return result;
}
/**
@@ -159,23 +146,28 @@ public class LogClientService implements AutoCloseable {
* @return log content bytes
*/
public byte[] getLogBytes(String host, int port, String path) {
- logger.info("log path {}", path);
+ logger.info("Get log bytes from host: {}, port: {}, logPath {}", host, port, path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
- GetLogBytesResponseCommand getLog = JSONUtils.parseObject(
- response.getBody(), GetLogBytesResponseCommand.class);
- return getLog.getData() == null ? new byte[0] : getLog.getData();
+ GetLogBytesResponseCommand getLog =
+ JSONUtils.parseObject(response.getBody(), GetLogBytesResponseCommand.class);
+ return getLog.getData() == null ? EMPTY_BYTE_ARRAY : getLog.getData();
}
+ return EMPTY_BYTE_ARRAY;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.error(
+ "Get logSize from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
+ host, port, path, ex);
+ return EMPTY_BYTE_ARRAY;
} catch (Exception e) {
- logger.error("get log size error", e);
- } finally {
- this.client.closeChannel(address);
+ logger.error("Get logSize from host: {}, port: {}, logPath: {} error", host, port, path, e);
+ return EMPTY_BYTE_ARRAY;
}
- return new byte[0];
}
/**
@@ -187,24 +179,28 @@ public class LogClientService implements AutoCloseable {
* @return remove task status
*/
public Boolean removeTaskLog(String host, int port, String path) {
- logger.info("log path {}", path);
+ logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
- Boolean result = false;
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
- RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject(
- response.getBody(), RemoveTaskLogResponseCommand.class);
+ RemoveTaskLogResponseCommand taskLogResponse =
+ JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus();
}
+ return false;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ logger.error(
+ "Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
+ host, port, path, ex);
+ return false;
} catch (Exception e) {
- logger.error("remove task log error", e);
- } finally {
- this.client.closeChannel(address);
+ logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
+ return false;
}
- return result;
}
public @Nullable List<String> getAppIds(@NonNull String host, int port,
@@ -212,26 +208,25 @@ public class LogClientService implements AutoCloseable {
logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath);
final Host workerAddress = new Host(host, port);
List<String> appIds = null;
- try {
- if (NetUtils.getHost().equals(host)) {
- appIds = new ArrayList<>(LogUtils.getAppIdsFromLogFile(taskLogFilePath));
- } else {
- final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
- Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
- if (response != null) {
- GetAppIdResponseCommand responseCommand =
- JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
- appIds = responseCommand.getAppIds();
- }
+ if (NetUtils.getHost().equals(host)) {
+ appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
+ } else {
+ final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
+ Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
+ if (response != null) {
+ GetAppIdResponseCommand responseCommand =
+ JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
+ appIds = responseCommand.getAppIds();
}
- } finally {
- client.closeChannel(workerAddress);
}
logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath);
return appIds;
}
- public boolean isRunning() {
- return isRunning;
+ @Override
+ public void close() {
+ this.client.close();
+ logger.info("LogClientService closed");
}
+
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
deleted file mode 100644
index f3c1078f07..0000000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.log;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * log asyc callback
- */
-public class LogPromise {
-
- private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
-
- /**
- * request unique identification
- */
- private long opaque;
-
- /**
- * start timemillis
- */
- private final long start;
-
- /**
- * timeout
- */
- private final long timeout;
-
- /**
- * latch
- */
- private final CountDownLatch latch;
-
- /**
- * result
- */
- private Object result;
-
- public LogPromise(long opaque, long timeout) {
- this.opaque = opaque;
- this.timeout = timeout;
- this.start = System.currentTimeMillis();
- this.latch = new CountDownLatch(1);
- PROMISES.put(opaque, this);
- }
-
- /**
- * notify client finish
- * @param opaque unique identification
- * @param result result
- */
- public static void notify(long opaque, Object result) {
- LogPromise promise = PROMISES.remove(opaque);
- if (promise != null) {
- promise.doCountDown(result);
- }
- }
-
- /**
- * countdown
- *
- * @param result result
- */
- private void doCountDown(Object result) {
- this.result = result;
- this.latch.countDown();
- }
-
- /**
- * whether timeout
- * @return timeout
- */
- public boolean isTimeout() {
- return System.currentTimeMillis() - start > timeout;
- }
-
- /**
- * get result
- * @return
- */
- public Object getResult() {
- try {
- latch.await(timeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- }
- PROMISES.remove(opaque);
- return this.result;
- }
-
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a94b01e609..e11ac7820e 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.springframework.transaction.annotation.Transactional;
@@ -77,7 +78,7 @@ public interface ProcessService {
boolean verifyIsNeedCreateCommand(Command command);
- ProcessInstance findProcessInstanceDetailById(int processId);
+ Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 513f02f161..c213346451 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,21 +17,14 @@
package org.apache.dolphinscheduler.service.process;
-import static java.util.stream.Collectors.toSet;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
-import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import io.micrometer.core.annotation.Counted;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -133,13 +126,17 @@ import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
-import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
-import org.apache.commons.collections.CollectionUtils;
-
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -150,22 +147,24 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import io.micrometer.core.annotation.Counted;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
+import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
+import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/**
* process relative dao that some mappers in this.
@@ -278,6 +277,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private CuringParamsService curingGlobalParamsService;
+ @Autowired
+ private LogClient logClient;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@@ -474,8 +476,8 @@ public class ProcessServiceImpl implements ProcessService {
* @return process instance
*/
@Override
- public ProcessInstance findProcessInstanceDetailById(int processId) {
- return processInstanceMapper.queryDetailById(processId);
+ public Optional<ProcessInstance> findProcessInstanceDetailById(int processId) {
+ return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
}
/**
@@ -597,16 +599,14 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
- try (LogClientService logClient = new LogClientService()) {
- for (TaskInstance taskInstance : taskInstanceList) {
- String taskLogPath = taskInstance.getLogPath();
- if (Strings.isNullOrEmpty(taskInstance.getHost())) {
- continue;
- }
- Host host = Host.of(taskInstance.getHost());
- // remove task log from loggerserver
- logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
+ for (TaskInstance taskInstance : taskInstanceList) {
+ String taskLogPath = taskInstance.getLogPath();
+ if (Strings.isNullOrEmpty(taskInstance.getHost())) {
+ continue;
}
+ Host host = Host.of(taskInstance.getHost());
+ // remove task log from loggerserver
+ logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
}
}
@@ -749,7 +749,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
Command command,
- Map<String, String> cmdParam) throws CodeGenerateException {
+ Map<String, String> cmdParam) {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
@@ -912,8 +912,8 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host
* @return process instance
*/
- protected ProcessInstance constructProcessInstance(Command command,
- String host) throws CronParseException, CodeGenerateException {
+ protected @Nullable ProcessInstance constructProcessInstance(Command command,
+ String host) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
@@ -929,7 +929,7 @@ public class ProcessServiceImpl implements ProcessService {
if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else {
- processInstance = this.findProcessInstanceDetailById(processInstanceId);
+ processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}
@@ -1068,7 +1068,8 @@ public class ProcessServiceImpl implements ProcessService {
*
* @return ProcessDefinition
*/
- private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
+ private @Nullable ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode,
+ Map<String, String> cmdParam) {
if (cmdParam != null) {
int processInstanceId = 0;
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
@@ -1080,7 +1081,7 @@ public class ProcessServiceImpl implements ProcessService {
}
if (processInstanceId != 0) {
- ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
+ ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}
@@ -1174,7 +1175,8 @@ public class ProcessServiceImpl implements ProcessService {
// copy parent instance user def params to sub process..
String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
if (!Strings.isNullOrEmpty(parentInstanceId)) {
- ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
+ ProcessInstance parentInstance =
+ findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)).orElse(null);
if (parentInstance != null) {
subProcessInstance.setGlobalParams(
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
@@ -3146,7 +3148,7 @@ public class ProcessServiceImpl implements ProcessService {
if (task == null) {
return;
}
- ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId());
+ ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
similarity index 82%
rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
index 6a6fb4c3bd..c0b6001303 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
@@ -40,8 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({LogClientService.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
-public class LogClientServiceTest {
+@PrepareForTest({LogClient.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
+public class LogClientTest {
@Test
public void testViewLogFromLocal() {
@@ -54,8 +54,8 @@ public class LogClientServiceTest {
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
- LogClientService logClientService = new LogClientService();
- String log = logClientService.viewLog(localMachine, port, path);
+ LogClient logClient = new LogClient();
+ String log = logClient.viewLog(localMachine, port, path);
Assert.assertNotNull(log);
}
@@ -75,8 +75,8 @@ public class LogClientServiceTest {
command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
- LogClientService logClientService = new LogClientService();
- String log = logClientService.viewLog(localMachine, port, path);
+ LogClient logClient = new LogClient();
+ String log = logClient.viewLog(localMachine, port, path);
Assert.assertNotNull(log);
}
@@ -86,8 +86,8 @@ public class LogClientServiceTest {
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
PowerMockito.doNothing().when(remotingClient).close();
- LogClientService logClientService = new LogClientService();
- logClientService.close();
+ LogClient logClient = new LogClient();
+ logClient.close();
}
@Test
@@ -100,8 +100,8 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
- LogClientService logClientService = new LogClientService();
- String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
+ LogClient logClient = new LogClient();
+ String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
Assert.assertNotNull(msg);
}
@@ -115,8 +115,8 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
- LogClientService logClientService = new LogClientService();
- byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log");
+ LogClient logClient = new LogClient();
+ byte[] logBytes = logClient.getLogBytes("localhost", 1234, "/tmp/log");
Assert.assertNotNull(logBytes);
}
@@ -130,14 +130,9 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
- LogClientService logClientService = new LogClientService();
- Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path");
+ LogClient logClient = new LogClient();
+ Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
Assert.assertTrue(status);
}
- @Test
- public void testIsRunning() {
- LogClientService logClientService = new LogClientService();
- Assert.assertTrue(logClientService.isRunning());
- }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
index 9ee702fd20..5be423d8c6 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import java.util.Set;
+import java.util.List;
public abstract class AbstractRemoteTask extends AbstractTask {
@@ -38,7 +38,7 @@ public abstract class AbstractRemoteTask extends AbstractTask {
this.cancelApplication();
}
- public abstract Set<String> getApplicationIds() throws TaskException;
+ public abstract List<String> getApplicationIds() throws TaskException;
public abstract void cancelApplication() throws TaskException;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index e51429692f..5e3ec8fab2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -21,8 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import java.util.Set;
-import java.util.regex.Matcher;
+import java.util.List;
import java.util.regex.Pattern;
/**
@@ -106,24 +105,11 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
* @return
* @throws TaskException
*/
- public Set<String> getApplicationIds() throws TaskException {
+ @Override
+ public List<String> getApplicationIds() throws TaskException {
return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger);
}
- /**
- * find app id
- *
- * @param line line
- * @return appid
- */
- protected String findAppId(String line) {
- Matcher matcher = YARN_APPLICATION_REGEX.matcher(line);
- if (matcher.find()) {
- return matcher.group();
- }
- return null;
- }
-
/**
* create command
*
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
index 0df5c3a911..c3833c071a 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -23,8 +23,10 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -42,14 +44,14 @@ public class LogUtils {
private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
- public Set<String> getAppIdsFromLogFile(@NonNull String logPath) {
+ public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
return getAppIdsFromLogFile(logPath, log);
}
- public Set<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
+ public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
File logFile = new File(logPath);
if (!logFile.exists() || !logFile.isFile()) {
- return Collections.emptySet();
+ return Collections.emptyList();
}
Set<String> appIds = new HashSet<>();
try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
@@ -65,10 +67,10 @@ public class LogUtils {
}
}
});
- return appIds;
+ return new ArrayList<>(appIds);
} catch (IOException e) {
logger.error("Get appId from log file erro, logPath: {}", logPath, e);
- return Collections.emptySet();
+ return Collections.emptyList();
}
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
index bd961d3137..d72a9737bf 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
@@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
-import java.util.Set;
+import java.util.List;
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Lists;
public class LogUtilsTest {
@@ -31,7 +31,7 @@ public class LogUtilsTest {
@Test
public void getAppIdsFromLogFile() {
- Set<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
- Assert.assertEquals(Sets.newHashSet("application_1548381669007_1234"), appIds);
+ List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
+ Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index 51751fe10d..11269d3ac7 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -17,13 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.dinky;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.MissingNode;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -31,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
@@ -45,10 +42,13 @@ import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.MissingNode;
public class DinkyTask extends AbstractRemoteTask {
@@ -73,8 +73,8 @@ public class DinkyTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
@@ -112,20 +112,23 @@ public class DinkyTask extends AbstractRemoteTask {
if (!checkResult(jobInstanceInfoResult)) {
break;
}
- String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
+ String jobInstanceStatus =
+ jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
- logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
+ logger.info("dinky task finished with results: {}",
+ result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
- errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
+ errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
+ .asText());
finishFlag = true;
break;
default:
@@ -191,14 +194,14 @@ public class DinkyTask extends AbstractRemoteTask {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- address,
- taskId);
+ this.taskExecutionContext.getTaskInstanceId(),
+ address,
+ taskId);
cancelTask(address, taskId);
logger.warn("dinky task terminated, taskId: {}, address: {}, taskId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- address,
- taskId);
+ this.taskExecutionContext.getTaskInstanceId(),
+ address,
+ taskId);
}
private JsonNode submitTask(String address, String taskId) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index a746185805..0b44b7c894 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -17,6 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.emr;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@@ -31,16 +40,6 @@ import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
-import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskException;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
/**
* AddJobFlowSteps task executor
*
@@ -51,10 +50,9 @@ public class EmrAddStepsTask extends AbstractEmrTask {
private String stepId;
private final HashSet<String> waitingStateSet = Sets.newHashSet(
- StepState.PENDING.toString(),
- StepState.CANCEL_PENDING.toString(),
- StepState.RUNNING.toString()
- );
+ StepState.PENDING.toString(),
+ StepState.CANCEL_PENDING.toString(),
+ StepState.RUNNING.toString());
/**
* constructor
@@ -66,8 +64,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
@@ -126,13 +124,16 @@ public class EmrAddStepsTask extends AbstractEmrTask {
final AddJobFlowStepsRequest addJobFlowStepsRequest;
try {
- addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
+ addJobFlowStepsRequest =
+ objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
} catch (JsonProcessingException e) {
throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e);
}
- // When a single task definition is associated with multiple steps, the state tracking will have high complexity.
- // Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state.
+ // When a single task definition is associated with multiple steps, the state tracking will have high
+ // complexity.
+ // Therefore, A task definition only supports the association of a single step, which can better ensure the
+ // reliability of the task state.
if (addJobFlowStepsRequest.getSteps().size() > 1) {
throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step");
}
@@ -178,7 +179,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
@Override
public void cancelApplication() throws TaskException {
- logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
+ logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
+ this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);
@@ -187,10 +189,10 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList()
- .stream()
- .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
- .findFirst()
- .orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
+ .stream()
+ .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
+ .findFirst()
+ .orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) {
throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index 770fb9f996..38a1177555 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
@@ -57,8 +57,8 @@ public class EmrJobFlowTask extends AbstractEmrTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index f6d4e56815..7e63db7462 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -29,11 +29,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
public class FlinkStreamTask extends FlinkTask implements StreamTask {
@@ -99,7 +95,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override
public void cancelApplication() throws TaskException {
- Set<String> appIds = getApplicationIds();
+ List<String> appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) {
logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
return;
@@ -120,7 +116,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override
public void savePoint() throws Exception {
- Set<String> appIds = getApplicationIds();
+ List<String> appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) {
logger.warn("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
return;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index a342b2aac3..397b2bc38c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -19,30 +19,13 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -113,34 +96,6 @@ public class FlinkTask extends AbstractYarnTask {
return flinkParameters;
}
- @Override
- public Set<String> getApplicationIds() throws TaskException {
- Set<String> appIds = new HashSet<>();
-
- File file = new File(taskRequest.getLogPath());
- if (!file.exists()) {
- return appIds;
- }
-
- /*
- * analysis log? get submitted yarn application id
- */
- try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
- String line;
- while ((line = br.readLine()) != null) {
- String appId = findAppId(line);
- if (StringUtils.isNotEmpty(appId)) {
- appIds.add(appId);
- }
- }
- } catch (FileNotFoundException e) {
- throw new TaskException("get application id error, file not found, path:" + taskRequest.getLogPath());
- } catch (IOException e) {
- throw new TaskException("get application id error, path:" + taskRequest.getLogPath(), e);
- }
- return appIds;
- }
-
/**
* find app id
*
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 06ae0b953e..dc0aa6a357 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.hivecli;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -39,7 +38,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
public class HiveCliTask extends AbstractRemoteTask {
@@ -59,8 +57,8 @@ public class HiveCliTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 6587fddc31..a83f4081ff 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.jupyter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -41,7 +38,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
public class JupyterTask extends AbstractRemoteTask {
@@ -66,8 +64,8 @@ public class JupyterTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index b405ae5f17..691e9abf3a 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
public class K8sTask extends AbstractK8sTask {
@@ -59,8 +59,8 @@ public class K8sTask extends AbstractK8sTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
@@ -72,7 +72,7 @@ public class K8sTask extends AbstractK8sTask {
protected String buildCommand() {
K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
- Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
+ Map<String, String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String namespaceName = namespace.get(NAMESPACE_NAME);
String clusterName = namespace.get(CLUSTER);
k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index 177122847e..debf913431 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.pigeon;
-import org.apache.commons.collections4.CollectionUtils;
-
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -28,6 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
@@ -37,10 +36,7 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -48,9 +44,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+
/**
* TIS DataX Task
**/
@@ -70,8 +68,8 @@ public class PigeonTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
@@ -103,9 +101,10 @@ public class PigeonTask extends AbstractRemoteTask {
ExecResult execState = null;
int taskId;
WebSocketClient webSocket = null;
- try (CloseableHttpClient client = HttpClients.createDefault();
- // trigger to start PIGEON dataX task
- CloseableHttpResponse response = client.execute(post)) {
+ try (
+ CloseableHttpClient client = HttpClients.createDefault();
+ // trigger to start PIGEON dataX task
+ CloseableHttpResponse response = client.execute(post)) {
triggerResult = processResponse(triggerUrl, response, BizResult.class);
if (!triggerResult.isSuccess()) {
List<String> errormsg = triggerResult.getErrormsg();
@@ -155,7 +154,8 @@ public class PigeonTask extends AbstractRemoteTask {
long costTime = System.currentTimeMillis() - startTime;
logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'");
- setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE);
+ setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS
+ : TaskConstants.EXIT_CODE_FAILURE);
} catch (Exception e) {
logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@@ -187,15 +187,17 @@ public class PigeonTask extends AbstractRemoteTask {
logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId());
final String triggerUrl = getTriggerUrl();
- StringEntity entity = new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
+ StringEntity entity =
+ new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
CancelResult cancelResult = null;
HttpPost post = new HttpPost(triggerUrl);
addFormUrlencoded(post);
post.setEntity(entity);
- try (CloseableHttpClient client = HttpClients.createDefault();
- // trigger to start TIS dataX task
- CloseableHttpResponse response = client.execute(post)) {
+ try (
+ CloseableHttpClient client = HttpClients.createDefault();
+ // trigger to start TIS dataX task
+ CloseableHttpResponse response = client.execute(post)) {
cancelResult = processResponse(triggerUrl, response, CancelResult.class);
if (!cancelResult.isSuccess()) {
List<String> errormsg = triggerResult.getErrormsg();
@@ -229,6 +231,7 @@ public class PigeonTask extends AbstractRemoteTask {
final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId);
logger.info("apply ws connection,uri:{}", applyURI);
WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) {
+
@Override
public void onOpen(ServerHandshake handshakedata) {
logger.info("start to receive remote execute log");
@@ -254,7 +257,8 @@ public class PigeonTask extends AbstractRemoteTask {
return webSocketClient;
}
- private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response, Class<T> clazz) throws Exception {
+ private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response,
+ Class<T> clazz) throws Exception {
StatusLine resStatus = response.getStatusLine();
if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) {
throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase());
@@ -272,6 +276,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class CancelResult extends AjaxResult<Object> {
+
private Object bizresult;
@Override
@@ -285,6 +290,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class BizResult extends AjaxResult<TriggerBuildResult> {
+
private TriggerBuildResult bizresult;
@Override
@@ -302,6 +308,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class StatusResult extends AjaxResult<Map> {
+
private Map bizresult;
@Override
@@ -351,6 +358,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class TriggerBuildResult {
+
private int taskid;
public int getTaskid() {
@@ -387,6 +395,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class ExecLog {
+
private String logType;
private String msg;
private int taskId;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index 22c898d355..8299f85cd2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -34,8 +34,8 @@ import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
@@ -73,8 +73,8 @@ public class SagemakerTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b0e8aa5454..3d896adc58 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -17,11 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.BooleanUtils;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -33,6 +32,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
+
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -42,10 +44,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
/**
* seatunnel task
@@ -82,8 +80,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
@Override
@@ -180,11 +178,13 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
private String buildConfigFilePath() {
- return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+ return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getTaskAppId());
}
private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
- logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+ logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(),
+ taskExecutionContext.getExecutePath());
if (!Files.exists(Paths.get(scriptFile))) {
logger.info("generate script file:{}", scriptFile);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 75b6155587..5c0473154e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kong.unirest.Unirest;
-
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@@ -29,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@@ -39,7 +36,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+
+import kong.unirest.Unirest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
public class ZeppelinTask extends AbstractRemoteTask {
@@ -235,8 +235,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
}
@Override
- public Set<String> getApplicationIds() throws TaskException {
- return Collections.emptySet();
+ public List<String> getApplicationIds() throws TaskException {
+ return Collections.emptyList();
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 927920360a..ed351c3b86 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -25,6 +26,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -42,12 +44,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
-import org.apache.dolphinscheduler.service.log.LogClientService;
-
-import org.apache.commons.collections.CollectionUtils;
-
-
-
+import org.apache.dolphinscheduler.service.log.LogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -65,15 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
- /**
- * task execute manager
- */
@Autowired
private WorkerManagerThread workerManager;
@Autowired
private MessageRetryRunner messageRetryRunner;
+ @Autowired
+ private LogClient logClient;
+
/**
* task kill process
*
@@ -92,12 +89,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("task kill command : {}", killCommand);
int taskInstanceId = killCommand.getTaskInstanceId();
- TaskExecutionContext taskExecutionContext =
- TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
- if (taskExecutionContext == null) {
- logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
- return;
- }
+ try {
+ LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+ TaskExecutionContext taskExecutionContext =
+ TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+ if (taskExecutionContext == null) {
+ logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
+ return;
+ }
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
@@ -110,19 +109,22 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return;
}
- // if processId > 0, it should call cancelApplication to cancel remote application too.
- this.cancelApplication(taskInstanceId);
- Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
+ // if processId > 0, it should call cancelApplication to cancel remote application too.
+ this.cancelApplication(taskInstanceId);
+ Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
- TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
- logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
+ logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
+ } finally {
+ LoggerUtils.removeTaskInstanceIdMDC();
+ }
}
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
@@ -228,10 +230,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
- try (LogClientService logClient = new LogClientService()) {
+ try {
logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath);
if (CollectionUtils.isEmpty(appIds)) {
+ logger.info("The appId is empty");
return Pair.of(true, Collections.emptyList());
}
@@ -241,7 +244,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Thread.currentThread().interrupt();
logger.error("kill yarn job error, the current thread has been interrtpted", e);
} catch (Exception e) {
- logger.error("kill yarn job error", e);
+ logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e);
}
return Pair.of(false, Collections.emptyList());
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
index 275fdf113d..9cb1a1e4f4 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
+import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
@@ -35,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
+import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -50,6 +52,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Date;
+import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
@@ -120,7 +123,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
if (task != null) {
try {
task.cancel();
- ProcessUtils.killYarnJob(taskExecutionContext);
+ List<String> appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath());
+ if (CollectionUtils.isNotEmpty(appIds)) {
+ ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
+ }
} catch (Exception e) {
logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
}