You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/02 13:30:16 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: master
add kill task logic (#2058)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new bb3885c master add kill task logic (#2058)
bb3885c is described below
commit bb3885cfe29a0f916ae1468f6403d80551ab91f6
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Mon Mar 2 21:30:04 2020 +0800
master add kill task logic (#2058)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
---
.../dolphinscheduler/common/enums/DbType.java | 10 ++
.../remote/command/KillTaskRequestCommand.java | 2 +-
.../builder/TaskExecutionContextBuilder.java | 7 +-
.../server/entity/DataxTaskExecutionContext.java | 116 ++++++++++++++++++++
.../server/entity/SQLTaskExecutionContext.java | 64 +++++++++++
.../server}/entity/TaskExecutionContext.java | 37 ++++++-
.../server/master/MasterServer.java | 2 +
.../master/cache/TaskInstanceCacheManager.java | 2 +-
.../cache/impl/TaskInstanceCacheManagerImpl.java | 2 +-
.../server/master/dispatch/ExecutorDispatcher.java | 4 +-
.../master/dispatch/context/ExecutionContext.java | 2 +-
.../dispatch/executor/AbstractExecutorManager.java | 1 +
.../master/dispatch/executor/ExecutorManager.java | 4 +-
.../dispatch/executor/NettyExecutorManager.java | 6 +-
.../master/dispatch/executor/NettyKillManager.java | 119 +++++++++++++++++++++
.../host/assign/LowerWeightRoundRobin.java | 6 ++
.../server/master/future/TaskFuture.java | 2 +-
...ocessor.java => TaskKillResponseProcessor.java} | 32 ++----
.../master/processor/TaskResponseProcessor.java | 2 +
.../server/master/registry/MasterRegistry.java | 3 +-
.../master/runner/MasterBaseTaskExecThread.java | 4 +-
.../server/master/runner/MasterExecThread.java | 5 +-
.../server/master/runner/MasterTaskExecThread.java | 42 +++++---
.../dolphinscheduler/server/monitor/Monitor.java | 5 +
.../server/registry/ZookeeperNodeManager.java | 6 +-
.../server/registry/ZookeeperRegistryCenter.java | 6 +-
.../dolphinscheduler/server/utils/ParamUtils.java | 3 +-
.../server/utils/ProcessUtils.java | 3 +-
.../cache/TaskExecutionContextCacheManager.java | 3 +-
.../impl/TaskExecutionContextCacheManagerImpl.java | 2 +-
.../worker/processor/TaskExecuteProcessor.java | 3 +-
.../server/worker/processor/TaskKillProcessor.java | 49 +++++----
.../server/worker/registry/WorkerRegistry.java | 4 +-
.../server/worker/runner/TaskExecuteThread.java | 2 +-
.../worker/task/AbstractCommandExecutor.java | 23 ++--
.../server/worker/task/AbstractTask.java | 2 +-
.../server/worker/task/AbstractYarnTask.java | 2 +-
.../server/worker/task/PythonCommandExecutor.java | 2 +-
.../server/worker/task/ShellCommandExecutor.java | 3 +-
.../server/worker/task/TaskManager.java | 2 +-
.../server/worker/task/TaskProps.java | 28 ++---
.../server/worker/task/datax/DataxTask.java | 60 +++++------
.../server/worker/task/flink/FlinkTask.java | 4 +-
.../server/worker/task/http/HttpTask.java | 12 +--
.../server/worker/task/mr/MapReduceTask.java | 2 +-
.../worker/task/processdure/ProcedureTask.java | 3 +-
.../server/worker/task/python/PythonTask.java | 5 +-
.../server/worker/task/shell/ShellTask.java | 17 ++-
.../server/worker/task/spark/SparkTask.java | 3 +-
.../server/worker/task/sql/SqlTask.java | 65 +++--------
.../dolphinscheduler/server/zk/ZKMasterClient.java | 6 +-
.../dolphinscheduler/server/zk/ZKWorkerClient.java | 2 +
.../service/process/ProcessService.java | 4 +
53 files changed, 556 insertions(+), 249 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
index 5fb245a..cc3a295 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
@@ -57,4 +57,14 @@ public enum DbType {
public String getDescp() {
return descp;
}
+
+
+ public static DbType of(int type){
+ for(DbType ty : values()){
+ if(ty.getCode() == type){
+ return ty;
+ }
+ }
+ throw new IllegalArgumentException("invalid type : " + type);
+ }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
index 2e87540..b8cfd89 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class KillTaskRequestCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* processId
*/
private int processId;
/**
* host
*/
private String host;
/**
* tenantCode
*/
private String tenantCode;
/**
* logPath
*/
private String logPath;
/**
* executePath
*/
private String executePath;
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getHost() {
return host;
}
public void
setHost(String host) {
this.host = host;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.KILL_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* kill task request command
*/
public
class KillTaskRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public KillTaskRequestCommand() {
}
public KillTaskRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.KILL_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "KillTaskRequestCommand{" +
"taskExecutio
nContext='" + taskExecutionContext + '\'' +
'}';
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 8cdd13e..1388e79 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -17,10 +17,11 @@
package org.apache.dolphinscheduler.server.builder;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* TaskExecutionContext builder
@@ -47,6 +48,8 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
+ taskExecutionContext.setHost(taskInstance.getHost());
+ taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
return this;
}
@@ -54,7 +57,7 @@ public class TaskExecutionContextBuilder {
/**
* build processInstance related info
*
- * @param processInstance
+ * @param processInstance processInstance
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
new file mode 100644
index 0000000..dd8d646
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import java.io.Serializable;
+
+/**
+ * master/worker task transport
+ */
+public class DataxTaskExecutionContext implements Serializable{
+
+ /**
+ * dataSourceId
+ */
+ private int dataSourceId;
+
+ /**
+ * sourcetype
+ */
+ private int sourcetype;
+
+ /**
+ * sourceConnectionParams
+ */
+ private String sourceConnectionParams;
+
+ /**
+ * dataTargetId
+ */
+ private int dataTargetId;
+
+ /**
+ * targetType
+ */
+ private int targetType;
+
+ /**
+ * targetConnectionParams
+ */
+ private String targetConnectionParams;
+
+ public int getDataSourceId() {
+ return dataSourceId;
+ }
+
+ public void setDataSourceId(int dataSourceId) {
+ this.dataSourceId = dataSourceId;
+ }
+
+ public int getSourcetype() {
+ return sourcetype;
+ }
+
+ public void setSourcetype(int sourcetype) {
+ this.sourcetype = sourcetype;
+ }
+
+ public String getSourceConnectionParams() {
+ return sourceConnectionParams;
+ }
+
+ public void setSourceConnectionParams(String sourceConnectionParams) {
+ this.sourceConnectionParams = sourceConnectionParams;
+ }
+
+ public int getDataTargetId() {
+ return dataTargetId;
+ }
+
+ public void setDataTargetId(int dataTargetId) {
+ this.dataTargetId = dataTargetId;
+ }
+
+ public int getTargetType() {
+ return targetType;
+ }
+
+ public void setTargetType(int targetType) {
+ this.targetType = targetType;
+ }
+
+ public String getTargetConnectionParams() {
+ return targetConnectionParams;
+ }
+
+ public void setTargetConnectionParams(String targetConnectionParams) {
+ this.targetConnectionParams = targetConnectionParams;
+ }
+
+ @Override
+ public String toString() {
+ return "DataxTaskExecutionContext{" +
+ "dataSourceId=" + dataSourceId +
+ ", sourcetype=" + sourcetype +
+ ", sourceConnectionParams='" + sourceConnectionParams + '\'' +
+ ", dataTargetId=" + dataTargetId +
+ ", targetType=" + targetType +
+ ", targetConnectionParams='" + targetConnectionParams + '\'' +
+ '}';
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
new file mode 100644
index 0000000..b1ec20d
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.entity;
+
+import org.apache.dolphinscheduler.dao.entity.UdfFunc;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * SQL Task ExecutionContext
+ */
+public class SQLTaskExecutionContext implements Serializable {
+
+
+ /**
+ * warningGroupId
+ */
+ private int warningGroupId;
+ /**
+ * udf function list
+ */
+ private List<UdfFunc> udfFuncList;
+
+
+ public int getWarningGroupId() {
+ return warningGroupId;
+ }
+
+ public void setWarningGroupId(int warningGroupId) {
+ this.warningGroupId = warningGroupId;
+ }
+
+ public List<UdfFunc> getUdfFuncList() {
+ return udfFuncList;
+ }
+
+ public void setUdfFuncList(List<UdfFunc> udfFuncList) {
+ this.udfFuncList = udfFuncList;
+ }
+
+ @Override
+ public String toString() {
+ return "SQLTaskExecutionContext{" +
+ "warningGroupId=" + warningGroupId +
+ ", udfFuncList=" + udfFuncList +
+ '}';
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
similarity index 89%
rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 3ed71e5..fb3aab9 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -15,12 +15,10 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.remote.entity;
+package org.apache.dolphinscheduler.server.entity;
import java.io.Serializable;
import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -144,7 +142,6 @@ public class TaskExecutionContext implements Serializable{
*/
private Map<String, String> definedParams;
-
/**
* task AppId
*/
@@ -165,6 +162,20 @@ public class TaskExecutionContext implements Serializable{
*/
private String workerGroup;
+
+ /**
+ * sql TaskExecutionContext
+ */
+ private SQLTaskExecutionContext sqlTaskExecutionContext;
+
+ /**
+ * datax TaskExecutionContext
+ */
+ private DataxTaskExecutionContext dataxTaskExecutionContext;
+
+
+
+
public String getWorkerGroup() {
return workerGroup;
}
@@ -373,6 +384,21 @@ public class TaskExecutionContext implements Serializable{
this.appIds = appIds;
}
+ public SQLTaskExecutionContext getSqlTaskExecutionContext() {
+ return sqlTaskExecutionContext;
+ }
+
+ public void setSqlTaskExecutionContext(SQLTaskExecutionContext sqlTaskExecutionContext) {
+ this.sqlTaskExecutionContext = sqlTaskExecutionContext;
+ }
+
+ public DataxTaskExecutionContext getDataxTaskExecutionContext() {
+ return dataxTaskExecutionContext;
+ }
+
+ public void setDataxTaskExecutionContext(DataxTaskExecutionContext dataxTaskExecutionContext) {
+ this.dataxTaskExecutionContext = dataxTaskExecutionContext;
+ }
@Override
public String toString() {
@@ -402,6 +428,9 @@ public class TaskExecutionContext implements Serializable{
", taskAppId='" + taskAppId + '\'' +
", taskTimeoutStrategy=" + taskTimeoutStrategy +
", taskTimeout=" + taskTimeout +
+ ", workerGroup='" + workerGroup + '\'' +
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext +
+ ", dataxTaskExecutionContext=" + dataxTaskExecutionContext +
'}';
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 7c33b90..4c0c3e8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
+import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
@@ -127,6 +128,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
index 98d2a24..a62ee49 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* task instance state manager
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
index 6624eeb..dc775d8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
@@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
index c597dc1..df563a6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
@@ -67,8 +67,10 @@ public class ExecutorDispatcher implements InitializingBean {
}
/**
- * task dispatch
+ * task dispatch
+ *
* @param context context
+ * @return result
* @throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
index 5157dd2..19124d3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.context;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
index c0be5a8..9e4c222 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -27,6 +27,7 @@ public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{
/**
* before execute , add time monitor , timeout
+ *
* @param context context
* @throws ExecuteException
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
index 9b0b9af..f1707df 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -27,6 +27,7 @@ public interface ExecutorManager<T> {
/**
* before execute
+ *
* @param executeContext executeContext
* @throws ExecuteException
*/
@@ -35,12 +36,13 @@ public interface ExecutorManager<T> {
/**
* execute task
* @param context context
+ * @return T
* @throws ExecuteException
*/
T execute(ExecutionContext context) throws ExecuteException;
/**
- * after execute
+ * after execute
* @param context context
* @throws ExecuteException
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index f4b1dab..544a958 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
@@ -72,9 +72,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
+
/**
- * execute logic
+ * execute logic
* @param context context
+ * @return result
* @throws ExecuteException
*/
@Override
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
new file mode 100644
index 0000000..54d0022
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * netty executor manager
+ */
+@Service
+public class NettyKillManager extends AbstractExecutorManager<Boolean>{
+
+ private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
+ /**
+ * netty remote client
+ */
+ private final NettyRemotingClient nettyRemotingClient;
+
+ public NettyKillManager(){
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ /**
+ * register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
+ */
+ this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
+ }
+
+ /**
+ * execute logic
+ *
+ * @param context context
+ * @return result
+ * @throws ExecuteException
+ */
+ @Override
+ public Boolean execute(ExecutionContext context) throws ExecuteException {
+ Host host = context.getHost();
+ Command command = buildCommand(context);
+ try {
+ doExecute(host, command);
+ return true;
+ }catch (ExecuteException ex) {
+ logger.error(String.format("execute context : %s error", context.getContext()), ex);
+ return false;
+ }
+ }
+
+
+ private Command buildCommand(ExecutionContext context) {
+ KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
+ TaskExecutionContext taskExecutionContext = context.getContext();
+
+ requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+ return requestCommand.convert2Command();
+ }
+
+ /**
+ * execute logic
+ * @param host host
+ * @param command command
+ * @throws ExecuteException
+ */
+ private void doExecute(final Host host, final Command command) throws ExecuteException {
+ /**
+ * retry count,default retry 3
+ */
+ int retryCount = 3;
+ boolean success = false;
+ do {
+ try {
+ nettyRemotingClient.send(host, command);
+ success = true;
+ } catch (Exception ex) {
+ logger.error(String.format("send command : %s to %s error", command, host), ex);
+ retryCount--;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignore) {}
+ }
+ } while (retryCount >= 0 && !success);
+
+ if (!success) {
+ throw new ExecuteException(String.format("send command : %s to %s error", command, host));
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
index cadf418..bdf0f41 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
@@ -24,6 +24,12 @@ import java.util.Collection;
*/
public class LowerWeightRoundRobin implements Selector<HostWeight>{
+ /**
+ * select
+ * @param sources sources
+ * @return HostWeight
+ */
+ @Override
public HostWeight select(Collection<HostWeight> sources){
int totalWeight = 0;
int lowWeight = 0;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
index 0c6d740..d22c6f2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/future/TaskFuture.java
@@ -68,7 +68,7 @@ public class TaskFuture {
}
/**
- * wait for response
+ * wait for response
* @return command
* @throws InterruptedException
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
similarity index 64%
copy from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
copy to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index d6279c6..d6c3f69 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -23,36 +23,23 @@ import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
+import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task response processor
*/
-public class TaskResponseProcessor implements NettyRequestProcessor {
+public class TaskKillResponseProcessor implements NettyRequestProcessor {
- private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);
-
- /**
- * process service
- */
- private final ProcessService processService;
-
- /**
- * taskInstance cache manager
- */
- private final TaskInstanceCacheManager taskInstanceCacheManager;
-
- public TaskResponseProcessor(){
- this.processService = SpringApplicationContext.getBean(ProcessService.class);
- this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
- }
+ private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);
/**
* task final result response
@@ -63,16 +50,11 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
- Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
+ Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
- ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
+ KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
logger.info("received command : {}", responseCommand);
-
- taskInstanceCacheManager.cacheTaskInstance(responseCommand);
-
- processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
- responseCommand.getEndTime(),
- responseCommand.getTaskInstanceId());
+ logger.info("已经接受到了worker杀任务的回应");
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index d6279c6..ed76153 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -72,6 +72,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
+ responseCommand.getProcessId(),
+ responseCommand.getAppIds(),
responseCommand.getTaskInstanceId());
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index ebfb2f4..1eb06b6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -67,9 +67,10 @@ public class MasterRegistry {
private final String startTime;
/**
- * construct
+ * construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
+ * @param heartBeatInterval heartBeatInterval
*/
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 7812dbf..9d40de9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -147,7 +147,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param taskInstance taskInstance
* @return TaskExecutionContext
*/
- private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
+ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 576dc76..a05f8dc 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -142,8 +142,9 @@ public class MasterExecThread implements Runnable {
/**
* constructor of MasterExecThread
- * @param processInstance process instance
- * @param processService process dao
+ * @param processInstance processInstance
+ * @param processService processService
+ * @param nettyRemotingClient nettyRemotingClient
*/
public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
this.processService = processService;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index feba5a2..07f9168 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -26,15 +26,19 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
-import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_KILL;
/**
* master task exec thread
@@ -52,6 +56,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
*/
private TaskInstanceCacheManager taskInstanceCacheManager;
+
+ private NettyKillManager nettyKillManager;
+
/**
* constructor of MasterTaskExecThread
* @param taskInstance task instance
@@ -60,6 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
+ this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
}
/**
@@ -78,6 +86,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* TODO submit task instance and wait complete
+ *
* @return true is task quit is true
*/
@Override
@@ -99,14 +108,14 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
/**
- * TODO 在这里轮询数据库
+ * TODO polling db
*
* wait task quit
* @return true if task quit success
*/
public Boolean waitTaskQuit(){
// query new state
- taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
+ taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out
@@ -147,7 +156,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
}
// updateProcessInstance task instance
- taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
+ taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
@@ -163,23 +172,26 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
- * TODO Kill 任务
+ * TODO Kill TASK
*
* task instance add queue , waiting worker to kill
*/
- private void cancelTaskInstance(){
+ private void cancelTaskInstance() throws Exception{
if(alreadyKilled){
return ;
}
alreadyKilled = true;
- String host = taskInstance.getHost();
- if(host == null){
- host = Constants.NULL;
- }
- String queueValue = String.format("%s-%d",
- host, taskInstance.getId());
- // TODO 这里写
- taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
+
+ TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
+
+ ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
+
+ Host host = new Host();
+ host.setIp(taskInstance.getHost());
+ host.setPort(12346);
+ executionContext.setHost(host);
+
+ nettyKillManager.execute(executionContext);
logger.info("master add kill task :{} id:{} to kill queue",
taskInstance.getName(), taskInstance.getId() );
@@ -197,7 +209,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
- * get remain time(s)
+ * get remain time?s?
*
* @return remain time
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
index 3ee9488..8d7bf0b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/Monitor.java
@@ -23,6 +23,11 @@ public interface Monitor {
/**
* monitor server and restart
+ *
+ * @param masterPath masterPath
+ * @param workerPath workerPath
+ * @param port port
+ * @param installPath installPath
*/
void monitor(String masterPath, String workerPath, Integer port, String installPath);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index 25355e2..9a4a7ca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -74,7 +74,7 @@ public class ZookeeperNodeManager implements InitializingBean {
private ZookeeperRegistryCenter registryCenter;
/**
- * init listener
+ * init listener
* @throws Exception
*/
@Override
@@ -234,8 +234,8 @@ public class ZookeeperNodeManager implements InitializingBean {
/**
* get worker group nodes
- * @param workerGroup
- * @return
+ * @param workerGroup workerGroup
+ * @return worker nodes
*/
public Set<String> getWorkerGroupNodes(String workerGroup){
workerGroupLock.lock();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index a6a3ea0..b186a42 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -129,7 +129,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group directly
- * @return
+ * @return worker group nodes
*/
public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(getWorkerPath());
@@ -166,8 +166,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group path
- * @param workerGroup
- * @return
+ * @param workerGroup workerGroup
+ * @return worker group path
*/
public String getWorkerGroupPath(String workerGroup) {
return WORKER_PATH + "/" + workerGroup;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 3aba546..063a7d7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -111,7 +111,8 @@ public class ParamUtils {
/**
* get parameters map
- * @return user defined params map
+ * @param definedParams definedParams
+ * @return parameters map
*/
public static Map<String,Property> getUserDefParamsMap(Map<String,String> definedParams) {
if (definedParams != null) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index c6efbae..ee1f091 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -21,9 +21,8 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.commons.io.FileUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
index db78127..7df8e01 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
@@ -17,7 +17,8 @@
package org.apache.dolphinscheduler.server.worker.cache;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* TaskExecutionContextCacheManager
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
index 584c42b..009332f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.cache.impl;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.springframework.stereotype.Service;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3c79e8c..98e4e92 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -33,13 +33,12 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 1ea7394..c910aed 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
+import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -28,9 +29,9 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
@@ -75,32 +76,35 @@ public class TaskKillProcessor implements NettyRequestProcessor {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
+
/**
* kill task logic
*
- * @param killCommand killCommand
+ * @param context context
+ * @return execute result
*/
- private Boolean doKill(KillTaskRequestCommand killCommand){
+ private Boolean doKill(TaskExecutionContext context){
try {
- TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
+ TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId());
+ context.setProcessId(taskExecutionContext.getProcessId());
Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)){
- logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
+ logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId());
return false;
}
- killCommand.setProcessId(processId);
- String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId()));
+ String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId()));
- logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd);
+ logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
+
// find log and kill yarn job
- killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode());
+ killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode());
return true;
} catch (Exception e) {
@@ -115,29 +119,37 @@ public class TaskKillProcessor implements NettyRequestProcessor {
KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
logger.info("received command : {}", killTaskRequestCommand);
- Boolean killStatus = doKill(killTaskRequestCommand);
- KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(killTaskRequestCommand,killStatus);
+ String contextJson = killTaskRequestCommand.getTaskExecutionContext();
+
+ TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
+
+ Boolean killStatus = doKill(taskExecutionContext);
+
+ killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
+ new NettyRemoteChannel(channel, command.getOpaque()));
+
+ KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand);
}
/**
* build KillTaskResponseCommand
*
- * @param killTaskRequestCommand killTaskRequestCommand
+ * @param taskExecutionContext taskExecutionContext
* @param killStatus killStatus
- * @return KillTaskResponseCommand
+ * @return build KillTaskResponseCommand
*/
- private KillTaskResponseCommand buildKillTaskResponseCommand(KillTaskRequestCommand killTaskRequestCommand,
+ private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
Boolean killStatus) {
KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
- killTaskResponseCommand.setTaskInstanceId(killTaskRequestCommand.getTaskInstanceId());
- killTaskResponseCommand.setHost(killTaskRequestCommand.getHost());
+ killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
+ killTaskResponseCommand.setHost(taskExecutionContext.getHost());
killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
- killTaskResponseCommand.setProcessId(killTaskRequestCommand.getProcessId());
+ killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId());
killTaskResponseCommand.setAppIds(appIds);
- return null;
+ return killTaskResponseCommand;
}
/**
@@ -156,6 +168,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
String log = null;
try {
logClient = new LogClientService();
+ logger.info("view log host : {},logPath : {}", host,logPath);
log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
} finally {
if(logClient != null){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 977643c..a1d5524 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -76,9 +76,11 @@ public class WorkerRegistry {
private String workerGroup;
/**
- * construct
+ * construct
+ *
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
+ * @param heartBeatInterval heartBeatInterval
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index d26feaf..d3161ec 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index e573d3a..7727115 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -21,17 +21,13 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -67,11 +63,6 @@ public abstract class AbstractCommandExecutor {
protected Consumer<List<String>> logHandler;
/**
- * timeout
- */
- protected int timeout;
-
- /**
* logger
*/
protected Logger logger;
@@ -132,6 +123,7 @@ public abstract class AbstractCommandExecutor {
/**
* task specific execution logic
+ *
* @param execCommand execCommand
* @return CommandExecuteResult
* @throws Exception
@@ -174,8 +166,6 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
- // SHELL task state
- result.setExitStatusCode(process.exitValue());
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
taskExecutionContext.getExecutePath(),
@@ -195,6 +185,9 @@ public abstract class AbstractCommandExecutor {
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
+
+ // SHELL task state
+ result.setExitStatusCode(process.exitValue());
return result;
}
@@ -378,7 +371,7 @@ public abstract class AbstractCommandExecutor {
List<String> appIds = new ArrayList<>();
/**
- * analysis log,get submited yarn application id
+ * analysis log?get submited yarn application id
*/
for (String log : logs) {
String appId = findAppId(log);
@@ -440,13 +433,13 @@ public abstract class AbstractCommandExecutor {
/**
- * get remain time(s)
+ * get remain time?s?
*
* @return remain time
*/
private long getRemaintime() {
long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000;
- long remainTime = timeout - usedTime;
+ long remainTime = taskExecutionContext.getTaskTimeout() - usedTime;
if (remainTime < 0) {
throw new RuntimeException("task execution time out");
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index e6dd973..86aed54 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
index 2ce397a..62e35fd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
index ad0671f..344d00f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index 877c607..6b25cd3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.io.FileUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import java.io.File;
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
index 468375c..1fef7e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
index a7b66bb..483dd18 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskProps.java
@@ -129,19 +129,21 @@ public class TaskProps {
/**
* constructor
- * @param taskParams task params
- * @param taskDir task dir
- * @param scheduleTime schedule time
- * @param nodeName node name
- * @param taskType task type
- * @param taskInstanceId task instance id
- * @param envFile env file
- * @param tenantCode tenant code
- * @param queue queue
- * @param taskStartTime task start time
- * @param definedParams defined params
- * @param dependence dependence
- * @param cmdTypeIfComplement cmd type if complement
+ * @param taskParams taskParams
+ * @param scheduleTime scheduleTime
+ * @param nodeName nodeName
+ * @param taskType taskType
+ * @param taskInstanceId taskInstanceId
+ * @param envFile envFile
+ * @param tenantCode tenantCode
+ * @param queue queue
+ * @param taskStartTime taskStartTime
+ * @param definedParams definedParams
+ * @param dependence dependence
+ * @param cmdTypeIfComplement cmdTypeIfComplement
+ * @param host host
+ * @param logPath logPath
+ * @param executePath executePath
*/
public TaskProps(String taskParams,
Date scheduleTime,
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 24abe57..3e5aa51 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -39,6 +39,7 @@ import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -50,13 +51,13 @@ import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -99,11 +100,6 @@ public class DataxTask extends AbstractTask {
private DataxParameters dataXParameters;
/**
- * task dir
- */
- private String taskDir;
-
- /**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
@@ -114,11 +110,6 @@ public class DataxTask extends AbstractTask {
private TaskExecutionContext taskExecutionContext;
/**
- * processService
- */
- private ProcessService processService;
-
- /**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
@@ -127,13 +118,9 @@ public class DataxTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
- logger.info("task dir : {}", taskDir);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,logger);
-
- processService = SpringApplicationContext.getBean(ProcessService.class);
-
}
/**
@@ -151,12 +138,11 @@ public class DataxTask extends AbstractTask {
/**
* run DataX process
- *
+ *
* @throws Exception
*/
@Override
- public void handle()
- throws Exception {
+ public void handle() throws Exception {
try {
// set the name of the current thread
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
@@ -180,8 +166,8 @@ public class DataxTask extends AbstractTask {
/**
* cancel DataX process
- *
- * @param cancelApplication
+ *
+ * @param cancelApplication cancelApplication
* @throws Exception
*/
@Override
@@ -200,7 +186,9 @@ public class DataxTask extends AbstractTask {
private String buildDataxJsonFile()
throws Exception {
// generate json
- String fileName = String.format("%s/%s_job.json", taskDir, taskExecutionContext.getTaskAppId());
+ String fileName = String.format("%s/%s_job.json",
+ taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
@@ -230,13 +218,14 @@ public class DataxTask extends AbstractTask {
*/
private List<JSONObject> buildDataxJobContentJson()
throws SQLException {
- DataSource dataSource = processService.findDataSourceById(dataXParameters.getDataSource());
- BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(),
- dataSource.getConnectionParams());
+ DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();
- DataSource dataTarget = processService.findDataSourceById(dataXParameters.getDataTarget());
- BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(),
- dataTarget.getConnectionParams());
+
+ BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getSourcetype()),
+ dataxTaskExecutionContext.getSourceConnectionParams());
+
+ BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getTargetType()),
+ dataxTaskExecutionContext.getTargetConnectionParams());
List<JSONObject> readerConnArr = new ArrayList<>();
JSONObject readerConn = new JSONObject();
@@ -250,7 +239,7 @@ public class DataxTask extends AbstractTask {
readerParam.put("connection", readerConnArr);
JSONObject reader = new JSONObject();
- reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType()));
+ reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
reader.put("parameter", readerParam);
List<JSONObject> writerConnArr = new ArrayList<>();
@@ -263,7 +252,9 @@ public class DataxTask extends AbstractTask {
writerParam.put("username", dataTargetCfg.getUser());
writerParam.put("password", dataTargetCfg.getPassword());
writerParam.put("column",
- parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql()));
+ parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
+ DbType.of(dataxTaskExecutionContext.getTargetType()),
+ dataSourceCfg, dataXParameters.getSql()));
writerParam.put("connection", writerConnArr);
if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
@@ -275,7 +266,7 @@ public class DataxTask extends AbstractTask {
}
JSONObject writer = new JSONObject();
- writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType()));
+ writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType())));
writer.put("parameter", writerParam);
List<JSONObject> contentList = new ArrayList<>();
@@ -348,7 +339,9 @@ public class DataxTask extends AbstractTask {
private String buildShellCommandFile(String jobConfigFilePath)
throws Exception {
// generate scripts
- String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId());
+ String fileName = String.format("%s/%s_node.sh",
+ taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
@@ -364,9 +357,6 @@ public class DataxTask extends AbstractTask {
sbr.append(jobConfigFilePath);
String dataxCommand = sbr.toString();
- // find process instance by task id
- ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
-
// combining local and global parameters
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index f9ef958..f264749 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -23,12 +23,10 @@ import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import java.util.ArrayList;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index d5d2bb2..74a1728 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -30,13 +30,9 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.config.RequestConfig;
@@ -67,10 +63,7 @@ public class HttpTask extends AbstractTask {
*/
private HttpParameters httpParameters;
- /**
- * process service
- */
- private ProcessService processService;
+
/**
* Convert mill seconds to second unit
@@ -146,7 +139,6 @@ public class HttpTask extends AbstractTask {
*/
protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
RequestBuilder builder = createRequestBuilder();
- ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index 3923e7c..fbc7e21 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
index 5c351d4..82cb6a2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
@@ -31,10 +31,9 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index ada6b70..7a66227 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.Map;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 68b9b04..9fa2abe 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.io.File;
@@ -55,11 +52,6 @@ public class ShellTask extends AbstractTask {
private ShellParameters shellParameters;
/**
- * task dir
- */
- private String taskDir;
-
- /**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
@@ -122,7 +114,10 @@ public class ShellTask extends AbstractTask {
*/
private String buildCommand() throws Exception {
// generate scripts
- String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId());
+ String fileName = String.format("%s/%s_node.sh",
+ taskExecutionContext.getExecutePath(),
+ taskExecutionContext.getTaskAppId());
+
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
@@ -148,7 +143,7 @@ public class ShellTask extends AbstractTask {
shellParameters.setRawScript(script);
logger.info("raw script : {}", shellParameters.getRawScript());
- logger.info("task dir : {}", taskDir);
+ logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 4bb91dd..e25cffb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -24,11 +24,10 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import java.util.ArrayList;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 8775830..9a45c7d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
@@ -33,18 +32,15 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.permission.PermissionCheck;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.sql.*;
@@ -64,22 +60,10 @@ public class SqlTask extends AbstractTask {
* sql parameters
*/
private SqlParameters sqlParameters;
-
- /**
- * process service
- */
- private ProcessService processService;
-
/**
* alert dao
*/
private AlertDao alertDao;
-
- /**
- * datasource
- */
- private DataSource dataSource;
-
/**
* base datasource
*/
@@ -102,7 +86,7 @@ public class SqlTask extends AbstractTask {
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
- this.processService = SpringApplicationContext.getBean(ProcessService.class);
+
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
}
@@ -111,6 +95,7 @@ public class SqlTask extends AbstractTask {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
+
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
sqlParameters.getType(),
@@ -121,37 +106,15 @@ public class SqlTask extends AbstractTask {
sqlParameters.getShowType(),
sqlParameters.getConnParams());
- // not set data source
- if (sqlParameters.getDatasource() == 0){
- logger.error("datasource id not exists");
- exitStatusCode = -1;
- return;
- }
-
- dataSource= processService.findDataSourceById(sqlParameters.getDatasource());
-
- // data source is null
- if (dataSource == null){
- logger.error("datasource not exists");
- exitStatusCode = -1;
- return;
- }
-
- logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
- dataSource.getName(),
- dataSource.getType(),
- dataSource.getNote(),
- dataSource.getUserId(),
- dataSource.getConnectionParams());
-
Connection con = null;
List<String> createFuncs = null;
try {
// load class
- DataSourceFactory.loadClass(dataSource.getType());
+ DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType()));
+
// get datasource
- baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
- dataSource.getConnectionParams());
+ baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()),
+ sqlParameters.getConnParams());
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
@@ -175,9 +138,8 @@ public class SqlTask extends AbstractTask {
for(int i=0;i<ids.length;i++){
idsArray[i]=Integer.parseInt(ids[i]);
}
-
- List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(idsArray);
- createFuncs = UDFUtils.createFuncs(udfFuncList, taskExecutionContext.getTenantCode(), logger);
+ SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
+ createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger);
}
// execute sql task
@@ -262,7 +224,7 @@ public class SqlTask extends AbstractTask {
CommonUtils.loadKerberosConf();
// if hive , load connection params if exists
- if (HIVE == dataSource.getType()) {
+ if (HIVE == DbType.valueOf(sqlParameters.getType())) {
Properties paramProp = new Properties();
paramProp.setProperty(USER, baseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
@@ -387,10 +349,7 @@ public class SqlTask extends AbstractTask {
*/
public void sendAttachment(String title,String content){
- // process instance
- ProcessInstance instance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
-
- List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
+ List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
// receiving group list
List<String> receviersList = new ArrayList<String>();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 05a0790..77d2139 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -236,6 +236,8 @@ public class ZKMasterClient extends AbstractZKClient {
/**
* monitor master
+ * @param event event
+ * @param path path
*/
public void handleMasterEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
@@ -256,6 +258,8 @@ public class ZKMasterClient extends AbstractZKClient {
/**
* monitor worker
+ * @param event event
+ * @param path path
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
index 33990bc..a1d70f8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
@@ -72,6 +72,8 @@ public class ZKWorkerClient extends AbstractZKClient {
/**
* monitor worker
+ * @param event event
+ * @param path path
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index a26044e..55cd634 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1411,8 +1411,12 @@ public class ProcessService {
*/
public void changeTaskState(ExecutionStatus state,
Date endTime,
+ int processId,
+ String appIds,
int taskInstId) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
+ taskInstance.setPid(processId);
+ taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
saveTaskInstance(taskInstance);